어제 실습에 사용한 애플 주식 데이터를 SparkSQL을 가지고 동일한 데이터 분석을 해보자. 모든 답은 Pyspark의 SparkSQL을 통해 이뤄져야 한다.

먼저 PySpark과 Py4J를 설치하자

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 31 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 18.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=211f9e99f2adf12268af6d6ce3db11f054a88a704cb6d0e148d4384e4a9c91fc
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


#### Spark Session 만들기

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

#### 애플 주식 CSV 파일 로딩하기: https://pyspark-test-sj.s3-us-west-2.amazonaws.com/appl_stock.csv
일단 pandas 데이터프레임으로 로딩해서 Spark 데이터프레임으로 변경한다

In [3]:
import pandas as pd

apple_pandas_df = pd.read_csv("https://pyspark-test-sj.s3-us-west-2.amazonaws.com/appl_stock.csv")
apple_spark_df = spark.createDataFrame(apple_pandas_df)

apple_spark_df에 apple_stock이라는 테이블 이름을 준다

In [4]:
apple_spark_df.createOrReplaceTempView("apple_stock")

#### 1> 스키마를 프린트해보기

In [5]:
spark.sql("desc apple_stock")

DataFrame[col_name: string, data_type: string, comment: string]

#### 2> 처음 5개의 레코드를 출력해보기

In [6]:
spark.sql("SELECT * FROM apple_stock LIMIT 5").show()

+----------+----------+----------+----------+----------+---------+------------------+
|      Date|      Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+----------+----------+----------+----------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.380001|214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|213.249994|214.379993|150476200|         27.774976|
|2010-01-06|214.379993|    215.23|210.750004|210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|209.050005|    210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.060005|211.980005|111902700|         27.464034|
+----------+----------+----------+----------+----------+---------+------------------+



#### 3> Close 컬럼의 평균값은 얼마인가?

In [7]:
spark.sql("SELECT AVG(close) FROM apple_stock").show()

+-----------------+
|       avg(close)|
+-----------------+
|312.9270656379114|
+-----------------+



#### 4> Volume 컬럼의 최대값과 최소값은?

In [8]:
spark.sql("SELECT MAX(volume), MIN(volume) FROM apple_stock").show()

+-----------+-----------+
|max(volume)|min(volume)|
+-----------+-----------+
|  470249500|   11475900|
+-----------+-----------+



#### 보너스 질문: HV ratio라는 이름의 새로운 컬럼을 추가한 데이터프레임을 만들기. 이 컬럼의 값은 High/Volume으로 계산된다

In [9]:
apple_spark_df_with_hv = spark.sql("""
    SELECT *, high/volume as hvratio FROM apple_stock
""")   

In [10]:
apple_spark_df_with_hv.show(5)

+----------+----------+----------+----------+----------+---------+------------------+--------------------+
|      Date|      Open|      High|       Low|     Close|   Volume|         Adj Close|             hvratio|
+----------+----------+----------+----------+----------+---------+------------------+--------------------+
|2010-01-04|213.429998|214.499996|212.380001|214.009998|123432400|         27.727039|1.737793286041590...|
|2010-01-05|214.599998|215.589994|213.249994|214.379993|150476200|         27.774976|1.432718223878593...|
|2010-01-06|214.379993|    215.23|210.750004|210.969995|138040000|27.333178000000004|1.559185743262822...|
|2010-01-07|    211.75|212.000006|209.050005|    210.58|119282800|          27.28265|1.777288980473295...|
|2010-01-08|210.299994|212.000006|209.060005|211.980005|111902700|         27.464034|1.894503045949740...|
+----------+----------+----------+----------+----------+---------+------------------+--------------------+
only showing top 5 rows



#### 보너스 질문: 월별 Close 컬럼의 평균값은?

In [11]:
spark.sql("""
    SELECT Month(date) month, AVG(close) FROM apple_stock GROUP BY 1 ORDER BY 1
""").show()

+-----+------------------+
|month|        avg(close)|
+-----+------------------+
|    1| 322.2097142571429|
|    2| 321.3595563037037|
|    3| 332.9115673137254|
|    4| 340.5104108150685|
|    5|  351.621020857143|
|    6|288.12546566000003|
|    7| 281.7221621148649|
|    8|300.43858096129026|
|    9| 301.0763195902777|
|   10|308.30552563157903|
|   11| 306.2725174895105|
|   12|302.35053626845644|
+-----+------------------+

