### Spark basic operator & dataframe

In [None]:
# Dataframe function

# df.printSchema() : 스키마 정보를 출력
# df.schema : StructType 스키마를 반환
# df.columns : 컬럼명 정보를 반환
# df.show(n) : 데이터 n 개를 출력
# df.first() : 데이터 프레임의 첫 번째 Row 를 반환
# df.head(n) : 데이터 프레임의 처음부터 n 개의 Row 를 반환
# df.createOrReplaceTempView : 임시 뷰 테이블을 생성
# df.union(newdf) : 데이터프레임 간의 유니온 연산을 수행
# df.limit(n) : 추출할 로우수 제한
# df.repartition(n) : 파티션 재분배, 셔플발생
# df.coalesce() : 셔플하지 않고 파티션을 병합 마지막 스테이지의 reduce 수가 줄어드는 효과로 성능저하에 유의해야 함
# df.collect() : 모든 데이터 수집, 반환
# df.take(n) : 상위 n개 로우 반환

In [None]:
# Column function

# df.select : 컬럼이나 표현식 사용
# df.selectExpr : 문자열 표현식 사용 = df.select(expr())
# df.withColumn(컬럼명, 표현식) : 컬럼 추가, 비교, 컬럼명 변경
# df.withColumnRenamed(old_name, new_name) : 컬럼명 변경
# df.drop() : 컬럼 삭제
# df.where : 로우 필터링
# df.filter : 로우 필터링
# df.sort, df.orderBy : 정렬
# df.sortWithinPartitions : 파티션별 정렬

In [None]:
# ETC

# expr("someCol - 5") : 표현식
# lit() : 리터럴
# cast() : 컬럼 데이터 타입 변경
# distinct() : unique row
# desc(), asc() : 정렬 순서

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Data Engineer Basic Day3") \
    .config("spark.dataengineer.intermediate.day3", "tutorial-2") \
    .getOrCreate()

### Cover the spark dataa

#### 1) Create & use the table from file

In [None]:
df = spark.read.json("data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("2015_summary")

sql_result = spark.sql("SELECT * FROM 2015_summary").show(5)

#### 2) Select column (select, selectExpr)

In [None]:
from pyspark.sql.functions import *

df.select(upper(col("DEST_COUNTRY_NAME")), "ORIGIN_COUNTRY_NAME").show(2)
df.selectExpr("upper(DEST_COUNTRY_NAME)", "ORIGIN_COUNTRY_NAME").show(2)

In [None]:
df.selectExpr("DEST_COUNTRY_NAME as newColmnName", "DEST_COUNTRY_NAME").show(2)

df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

#### 3) Use constant value

In [None]:
from pyspark.sql.functions import lit

# df.select(expr("*"), lit(1).alias("One")).show(2)
df.selectExpr("*", "1 as One").show(2)

#### 4) Add columns

In [None]:
df.withColumn("numberOne", lit(1)).show(2)

In [None]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

In [None]:
before = df
before.printSchema()

after = before.withColumn("Destination", expr("DEST_COUNTRY_NAME"))
after.printSchema()

#### 5) Change column name

In [None]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "Destination").columns

#### 6) Drop columns

In [None]:
df.printSchema()
df.drop("ORIGIN_COUNTRY_NAME").columns

In [None]:
spark.conf.set('spark.sql.caseSensitive', True)
caseSensitive = df.drop("dest_country_name")
caseSensitive.printSchema()

spark.conf.set('spark.sql.caseSensitive', False)
caseInsensitive = df.drop("dest_country_name")
caseInsensitive.printSchema()

In [None]:
df.printSchema()
df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

#### 7) Change column data type

In [None]:
df.printSchema()

int2str = df.withColumn("str_count", col("count").cast("string"))
int2str.show(5)
int2str.printSchema()

str2int = int2str.withColumn("int_count", col("str_count").cast("int"))
str2int.show(5)
str2int.printSchema()

#### 8) Record filtering

In [None]:
df.where("count < 2").show(2)
df.filter("count < 2").show(2)

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

#### 9) Distinct value

In [None]:
print(df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count())
print(df.select("ORIGIN_COUNTRY_NAME").distinct().count())

#### 10) Sort

In [None]:
df.sort("count").show(2)
df.orderBy("count", "DEST_COUNTRY_NAME").show(2)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(2)

In [None]:
from pyspark.sql.functions import *
print("# asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드로 null의 정렬 순서를 지정")
df.sort("DEST_COUNTRY_NAME").show(1)
df.sort(df["DEST_COUNTRY_NAME"].asc_nulls_first()).show(1)
df.sort(df.DEST_COUNTRY_NAME.asc_nulls_first()).show(1)

In [None]:
from pyspark.sql.functions import desc, asc
df.orderBy(df["count"].desc()).show(2)
df.orderBy(df.ORIGIN_COUNTRY_NAME.desc(), df.DEST_COUNTRY_NAME.asc()).show(2)
df.orderBy(expr("ORIGIN_COUNTRY_NAME DESC"), expr("DEST_COUNTRY_NAME ASC")).show(2)

#### 11) Restrict row limit

In [None]:
df.limit(5).show()
df.orderBy(expr("count desc")).limit(6).show()

In [None]:
# Ex 3

In [None]:
# Ex 4