In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("spark_sql_basic2")
sc   = SparkContext(conf=conf)

In [2]:
# RDD만을 이용한 데이터 추출

In [3]:

movies_rdd = sc.parallelize([
    (1, ("어벤져스", "마블")),
    (2, ("슈퍼맨", "DC")),
    (3, ("배트맨", "DC")),
    (4, ("겨울왕국", "디즈니")),
    (5, ("아이언맨", "마블"))
])


attendances_rdd = sc.parallelize([
    (1, (13934592, "KR")),
    (2, (2182227,"KR")),
    (3, (4226242, "KR")),
    (4, (10303058, "KR")),
    (5, (4300365, "KR"))
])

In [4]:
# 마블 영화 중 관객 수가 500만 이상인 영화를 가져오기

In [7]:
# CASE1. join 먼저, filter 나중에
movie_att = movies_rdd.join(attendances_rdd)
print(movie_att.take(3))
movie_att.filter(
    lambda x : x[1][0][1] == "마블" and x[1][1][0] > 5000000
).collect()

[(2, (('슈퍼맨', 'DC'), (2182227, 'KR'))), (4, (('겨울왕국', '디즈니'), (10303058, 'KR'))), (1, (('어벤져스', '마블'), (13934592, 'KR')))]


[(1, (('어벤져스', '마블'), (13934592, 'KR')))]

In [11]:
# CASE 2. filter 먼저, join 나중에 -> 더 유리
filtered_movies = movies_rdd.filter(lambda x : x[1][1] == '마블')
filtered_att = attendances_rdd.filter(lambda x : x[1][0] > 5000000)

filtered_movies.join(filtered_att).collect()

[(1, (('어벤져스', '마블'), (13934592, 'KR')))]

In [12]:
sc.stop()

In [13]:
# Spark SQL 사용해 보기

In [14]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [15]:
# 컬럼 추가
movies = [
    (1, "어벤져스", "마블", 2012, 4, 26),
    (2, "슈퍼맨", "DC", 2013, 6, 13),
    (3, "배트맨", "DC", 2008, 8, 6),
    (4, "겨울왕국", "디즈니", 2014, 1, 16),
    (5, "아이언맨", "마블", 2008, 4, 30)
]

In [16]:
#스키마를 알아야 한다.
movie_schema = ["id", "name", "company", "year", "month", "day"]

In [17]:
# 2. 데이터 프레임 만들기

In [18]:
df = spark.createDataFrame(data=movies, schema=movie_schema)

In [19]:
df.select("name").show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



In [20]:
df.filter(df.year >= 2010).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



In [24]:
df.createOrReplaceTempView("movies")

In [27]:
df

DataFrame[id: bigint, name: string, company: string, year: bigint, month: bigint, day: bigint]

In [29]:
df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('company', 'string'),
 ('year', 'bigint'),
 ('month', 'bigint'),
 ('day', 'bigint')]

In [32]:
df.show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [34]:
df.filter(df.year >= 2010).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



In [36]:
df

DataFrame[id: bigint, name: string, company: string, year: bigint, month: bigint, day: bigint]

In [39]:
# 년,월,일 
df.select("year", "month", "day").show()

+----+-----+---+
|year|month|day|
+----+-----+---+
|2012|    4| 26|
|2013|    6| 13|
|2008|    8|  6|
|2014|    1| 16|
|2008|    4| 30|
+----+-----+---+



In [40]:
# 2013 이후 영화
df.filter(df.year >= 2013).show()


+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



In [41]:
# 마블, DC 같이 꺼내기  
df.filter((df.company == "마블") | (df.company == "DC")).show()


+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [23]:
# 2010년 이후에 개봉한 영화를 조회

query = """

"""
spark.sql(query).show()

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 3, pos 0)

== SQL ==
^^^


In [None]:
# 2012년도 이전에 개봉한 영화의 이름과 회사를 출력
query = """

"""
spark.sql(query).show()

In [None]:
# like 문자열 데이터에서 특정 단어나 문장을 포함한 데이터를 찾을 때
# % 기호를 사용해서 문장이 매칭되는지 확인 가능!
# 제목이 ~~맨으로 끝나는 데이터의 모든 정보를 조회
query = """

"""
spark.sql(query).show()

In [None]:

# BETWEEN 특정 데이터와 데이터 사이를 조회

# 개봉 월이 4 ~ 8월 사이. 4 <= 개봉월 <= 8


In [None]:
# Join 구현하기

In [None]:

attendances = [
    (1, 13934592., "KR"),
    (2, 2182227.,"KR"),
    (3, 4226242., "KR"),
    (4, 10303058., "KR"),
    (5, 4300365., "KR")
]

In [None]:
# 직접 스키마 지정해 보기
from pyspark.sql.types import StringType, FloatType\
    , IntegerType\
    , StructType, StructField

In [None]:
att_schema = StructType([ # 모든 컬럼의 타입을 통칭 - 컬럼 데이터의 집합
    StructField("id", IntegerType(), True), # StructField : 컬럼
    StructField("att", FloatType(), True),
    StructField("theater_country", StringType(), True)
])

In [None]:

att_df = spark.createDataFrame(
    data=attendances,
    schema=att_schema
)

att_df.dtypes

In [None]:
att_df.createOrReplaceTempView("att")

In [None]:
# 데이터 프레임 API

In [None]:
# select
df.select("*").collect()

In [None]:
df.select("name", "company").collect()

In [None]:
df.select(df.name, (df.year-2000).alias("year")).show()

In [None]:
# agg : Aggreagte의 약자로써, 그룹핑 후 데이터를 하나로 합쳐주는 역할
df.agg({"id": "count"}).collect()

In [None]:
from pyspark.sql import functions as F
df.agg(F.min(df.year)).collect()

In [None]:
df.groupBy().avg().collect()

In [None]:
# 회사별 개봉월의 평균
df.groupBy('company').agg({"month": "mean"}).collect()

In [None]:
# 회사 별 월 별 영화 개수 정보


In [None]:
# join : 다른 데이터 프레임과 사용자가 지정한 컬럼을 기준으로 합치는 작업
df.join(att_df, 'id').select(df.name, att_df.att).show()

In [None]:
# select, where, orderBy 절 사용
marvel_df = df.select("name", "company", "year").where("company=='마블'").orderBy("id")
marvel_df.collect()

In [None]:
spark.stop()
sc.stop()

In [None]:
ㅌ

In [None]:
# SQL 최적화

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

In [None]:
trip_file = "data/fhvhv_tripdata_2020-03.csv"

In [None]:
# inferSchema : 자동으로 스키마 예측하게 하기
data = spark.read.csv(trip_file, inferSchema=True, header=True)

In [None]:
data.createOrReplaceTempView("mobility_data")

In [None]:
query = """
select *
from mobility_data
limit 5
"""
spark.sql(query).show()

In [None]:
# 스파크 SQL을 사용하는 이유

In [None]:
query = """

select split(pickup_datetime, ' ')[0] as pickup_date, count(*) as trips
from mobility_data

group by pickup_date
"""

spark.sql(query).show()

In [None]:
# 실행 계획 살펴보기
spark.sql(query).explain(True)

In [None]:
# 두번째 쿼리
spark.sql("""select 
                pickup_date, 
                count(*) as trips
             from ( select
                          split(pickup_datetime, ' ')[0] as pickup_date
                          from mobility_data )
             group by pickup_date""").explain(True)

In [None]:
spark.stop()