#**2. Spark SQL**

1. 목적
  - 스파크 프로그래밍 내부에서 관계형 처리를 하기 위해
  - 스키마의 정보를 이용해 자동으로 최적화를 하기 위해
  - 외부 데이터셋을 사용하기 쉽게 하기 위해


2. **DataFrame**
  - Spark Core에 RDD가 있다면 Spark SQL엔 DataFrame 가 있음
  - DataFrame은 테이블 데이터셋이라고 보면 됨
  - 개념적으론 RDD에 스키마가 적용된 것
  - 이점
    - MLLib이나 Spark Streaming 같은 다른 Spark Module들과 사용하기 편함
    - 최적화가 알아서 됨


3. **SparkSession**
  - Spark Core에 SparkContext가 있다면 Spark SQL엔 SparkSession 가 있음


4. Spark에서 사용할 수 있는 SQL문
  - Hive Query Language와 거의 동일
      - Select
      - From
      - Where
      - Count
      - Having
      - Group By
      - Order By
      - Sort By
      - Distinct
      - Join

####**SparkSession과 DataFrame생성**

1. Spark Session

`spark = SparkSession.builder.appName("test-app").getOrCreate()`

2. DataFrame
- RDD에서 스키마를 정의한 다음 변형

  - RDD를 만드는 부분

    `lines = sc.textFile("example.csv") 하나의 긴 텍스트
    data = lines.map(lambda x : x.split(","))
    preprocessed = data.map(lambda x : Row(name = x[0], price=int(x[1])))`

  - Infer 자동으로 유추해서 만드는 부분

    `df = spark.createDataFrame(preprocessed)`

  - Schema를 사용자가 정의

      Specify 사용자가 스키마를 지정
    
      `schema = StructType(
    StructField("name", StringType(), True),
    StructField("price", StringType(), True)
  )`

    `spark.createDataFrame(preprocessed, schema).show()`

- CSV, JSON등 파일로부터 데이터 받아오기

  `dataframe = spark.read.json(json파일)`

  `dataframe_txt = spark.read.text(txt파일)`

  `dataframe_csv = spark.read.csv(csv파일)`

  `dataframe_parquet = spark.read.load(parquet파일)`



####**createOrReplaceTempView**

- DataFrame을 하나의 데이터베이스 테이블처럼 사용하려면 createOrReplaceTempView() 함수로 temporary view를 만들어줘야함(닉네임 지어주기)

  `data.createOrReplaceTempView("mobility_data")`

  `spark.sql("SELECT pickup_datetime FFOM mobility_data LIMIT 5").show()`

In [None]:
# mount Drive to access data files

from google.colab import drive
drive.mount('./mount')

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 49.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=63f8ea904ccc30481b45ade2ab096a6461962f5d9af384950e1e0b49cce81b2e
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


1. Spark Instance 만들기

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("learn-sql").getOrCreate()

2. 데이터

In [None]:
stocks = [
	('Google', 'GOOGL', 'USA', 2984, 'USD'),
	('Netflix', 'NFLX', 'USA', 645, 'USD'),
	('Amazon', 'AMZN', 'USA', 3518, 'USD'),
	('Tesla', 'TSLA', 'USA', 1222, 'USD'),
	('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
	('Toyota', '7203', 'Japan', 2006, 'JPY'),
	('Samsung', '005930', 'Korea', 70600, 'KRW'),
	('Kakao', '035720', 'Korea', 125000, 'KRW')
]

3. 스키마 생성

In [None]:
stockSchema = ["name","ticker","country","price","currency"]

4. DataFrame 만들기

In [None]:
df = spark.createDataFrame(data = stocks, schema = stockSchema)

# 데이터 타입 확인
df.dtypes

[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]

In [None]:
# 데이터 프레임 확인
df.show()

+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+



5. Temporary View에 등록 - 닉네임 stocks

In [None]:
df.createOrReplaceTempView("stocks")

6. 회사 이름 가져오기

In [None]:
spark.sql("select name from stocks").show()

+-------+
|   name|
+-------+
| Google|
|Netflix|
| Amazon|
|  Tesla|
|Tencent|
| Toyota|
|Samsung|
|  Kakao|
+-------+



7. 여러 개의 컬럼 가져오기

In [None]:
spark.sql("select name, price from stocks").show()

+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
|Netflix|   645|
| Amazon|  3518|
|  Tesla|  1222|
|Tencent|   483|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



8. 조건문

In [None]:
# country가 Korea인 name, price 컬럼 가져오기

spark.sql("select name, price from stocks where country = 'Korea'").show()

+-------+------+
|   name| price|
+-------+------+
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



In [None]:
# price>2000인 name, price 컬럼 가져오기

spark.sql("select name, price from stocks where price > 2000").show()

+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
| Amazon|  3518|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



In [None]:
# price>2000이면서 country가 USA인 name, price 컬럼 가져오기

spark.sql("select name,price from stocks where price>2000 and country = 'USA'").show()

+------+-----+
|  name|price|
+------+-----+
|Google| 2984|
|Amazon| 3518|
+------+-----+



9. like : String에서 유용하게 쓰임

In [None]:
# U로 시작하는 나라의 name,price 컬럼 가져오기

spark.sql("select name, price from stocks where country like 'U%'").show()

+-------+-----+
|   name|price|
+-------+-----+
| Google| 2984|
|Netflix|  645|
| Amazon| 3518|
|  Tesla| 1222|
+-------+-----+



In [None]:
# U로 시작하는 나라, 회사 이름에서 e가 없는 곳의 name, price 컬럼 가져오기

spark.sql("select name, price from stocks where country like 'U%' and name not like '%e%'").show()

+------+-----+
|  name|price|
+------+-----+
|Amazon| 3518|
+------+-----+



10. between

In [None]:
# 1000<price<10000 인 name, price 컬럼 가져오기

spark.sql("select name, price from stocks where price between 1000 and 10000").show()

+------+-----+
|  name|price|
+------+-----+
|Google| 2984|
|Amazon| 3518|
| Tesla| 1222|
|Toyota| 2006|
+------+-----+



11. 중첩 사용

In [None]:
# Tesla보다 비싸며 currency가 USD인 name,price,currency 컬럼 가져오기

spark.sql("select name, price, currency from stocks \
where currency = 'USD' and price > (select price from stocks where name = 'Tesla')").show()

+------+-----+--------+
|  name|price|currency|
+------+-----+--------+
|Google| 2984|     USD|
|Amazon| 3518|     USD|
+------+-----+--------+



12. order by

In [None]:
#오름차순

spark.sql("select name, price from stocks order by price asc").show()

+-------+------+
|   name| price|
+-------+------+
|Tencent|   483|
|Netflix|   645|
|  Tesla|  1222|
| Toyota|  2006|
| Google|  2984|
| Amazon|  3518|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



In [None]:
#내림차순

spark.sql("select name, price from stocks order by price desc").show()

+-------+------+
|   name| price|
+-------+------+
|  Kakao|125000|
|Samsung| 70600|
| Amazon|  3518|
| Google|  2984|
| Toyota|  2006|
|  Tesla|  1222|
|Netflix|   645|
|Tencent|   483|
+-------+------+



In [None]:
#회사 길이의 순서대로

spark.sql("select name, price from stocks order by length(name)").show()

+-------+------+
|   name| price|
+-------+------+
|  Tesla|  1222|
|  Kakao|125000|
| Amazon|  3518|
| Toyota|  2006|
| Google|  2984|
|Netflix|   645|
|Samsung| 70600|
|Tencent|   483|
+-------+------+



13. grouping aggregate

In [None]:
#합

spark.sql("select sum(price) from stocks where country = 'Korea'").show()

+----------+
|sum(price)|
+----------+
|    195600|
+----------+



In [None]:
#평균

spark.sql("select mean(price) from stocks where country = 'Korea'").show()

+-----------+
|mean(price)|
+-----------+
|    97800.0|
+-----------+



In [None]:
#'Korea' 값이 몇개인지

spark.sql("select count(price) from stocks where country = 'Korea'").show()

+------------+
|count(price)|
+------------+
|           2|
+------------+



14. in

In [None]:
#USA, Korea 가 몇개인지

spark.sql("select count(price) from stocks where country in ('Korea', 'USA')").show()

+------------+
|count(price)|
+------------+
|           6|
+------------+



15. join : 여러개의 테이블 다루기 (실무에선 여러개의 테이블의 데이터를 씀 -> 유용함)

In [None]:
# 새로운 데이터 가져오기
earnings = [
('Google', 27.99, 'USD'),
('Netflix', 2.56, 'USD'),
('Amazon', 6.12, 'USD'),
('Tesla', 1.86, 'USD'),
('Tencent', 11.01, 'HKD'),
('Toyota', 224.82, 'JPY'),
('Samsung', 1780., 'KRW'),
('Kakao', 705., 'KRW')
]

#직접 스키마의 타입 정하기
from pyspark.sql.types import StringType, FloatType, StructType, StructField

earningsSchema = StructType([
	StructField("name", StringType(), True),
	StructField("eps", FloatType(), True),
	StructField("currency", StringType(), True)
])


earningsDF = spark.createDataFrame(data = earnings, schema = earningsSchema)

In [None]:
#데이터프레임 스키마 타입 확인하기

earningsDF.dtypes

[('name', 'string'), ('eps', 'float'), ('currency', 'string')]

In [None]:
earningsDF.createOrReplaceTempView("earnings")

In [None]:
#데이터프레임 데이터 보기

earningsDF.select("*").show()

+-------+------+--------+
|   name|   eps|currency|
+-------+------+--------+
| Google| 27.99|     USD|
|Netflix|  2.56|     USD|
| Amazon|  6.12|     USD|
|  Tesla|  1.86|     USD|
|Tencent| 11.01|     HKD|
| Toyota|224.82|     JPY|
|Samsung|1780.0|     KRW|
|  Kakao| 705.0|     KRW|
+-------+------+--------+



Inner Join

In [None]:
# join의 기준 : stocks.name = earnings.name

spark.sql("select * from stocks join earnings on stocks.name = earnings.name").show()

+-------+------+---------+------+--------+-------+------+--------+
|   name|ticker|  country| price|currency|   name|   eps|currency|
+-------+------+---------+------+--------+-------+------+--------+
| Amazon|  AMZN|      USA|  3518|     USD| Amazon|  6.12|     USD|
| Google| GOOGL|      USA|  2984|     USD| Google| 27.99|     USD|
|  Kakao|035720|    Korea|125000|     KRW|  Kakao| 705.0|     KRW|
|Netflix|  NFLX|      USA|   645|     USD|Netflix|  2.56|     USD|
|Samsung|005930|    Korea| 70600|     KRW|Samsung|1780.0|     KRW|
|Tencent|  0700|Hong Kong|   483|     HKD|Tencent| 11.01|     HKD|
|  Tesla|  TSLA|      USA|  1222|     USD|  Tesla|  1.86|     USD|
| Toyota|  7203|    Japan|  2006|     JPY| Toyota|224.82|     JPY|
+-------+------+---------+------+--------+-------+------+--------+



In [None]:
#PER(Price-earnings ratio) : Price / EPS

spark.sql("select stocks.name, (stocks.price/earnings.eps) from stocks join earnings on stocks.name = earnings.name").show()

+-------+------------------+
|   name|     (price / eps)|
+-------+------------------+
| Amazon| 574.8366120563447|
| Google| 106.6095042658442|
|  Kakao| 177.3049645390071|
|Netflix| 251.9531306315913|
|Samsung|39.662921348314605|
|Tencent| 43.86920889728746|
|  Tesla|  656.989242258975|
| Toyota| 8.922693419839167|
+-------+------------------+



In [None]:
# 사용 후 종료!

spark.stop()

##**실습**

###Data : TLC Trip Record Data
- 10+년 이상의 택시와 모빌리티 서비스 기록
- 매년 20GB씩 쌓임
- https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

####데이터 정보
- hvfhs_license_num : 회사 면허 번호
- dispatching_base_num : 지역 라이센스 번호
- pickup_datetime : 승차 시간
- dropoff_datetime : 하차 시간
- PULocationID : 승차 지역 ID
- DOLocationID : 하차 지역 ID
- SR_Flag : 합승 여부 Flag

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("taxi-analysis").getOrCreate()

In [None]:
# 데이터

trip_files = "/content/drive/MyDrive/Colab Notebooks/2022/BOAZ/학기 세션/fhvhv_tripdata_2020-03.csv"
zone_file = "/content/drive/MyDrive/Colab Notebooks/2022/BOAZ/학기 세션/taxi+_zone_lookup.csv"

# df생성
trips_df = spark.read.csv(trip_files, inferSchema = True, header = True)
zone_df = spark.read.csv(zone_file, inferSchema = True, header = True)

In [None]:
# 스키마 확인

trips_df.printSchema()
zone_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: integer (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable

In [None]:
trips_df.createOrReplaceTempView("trips")
zone_df.createOrReplaceTempView("zone")

In [None]:
# 사용 후 종료!

spark.stop()

Reference


https://spark.apache.org/docs/latest/api/python/#