PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다.

이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9 



지금부터 실습은 Redshift에 있는 데이터를 가지고 해볼 예정이고 그래서 Redshift 관련 JAR 파일을 설치해야함

In [2]:
!cd /usr/local/lib/python3.7/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

--2021-07-22 04:02:08--  https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.141.94
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.141.94|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2413910 (2.3M) [application/java-archive]
Saving to: ‘RedshiftJDBC42-no-awssdk-1.2.20.1043.jar.2’


2021-07-22 04:02:08 (16.2 MB/s) - ‘RedshiftJDBC42-no-awssdk-1.2.20.1043.jar.2’ saved [2413910/2413910]



**Spark Session:** 이번 SparkSession은 spark.jars를 통해 앞서 다운로드받은 Redshift 연결을 위한 JDBC 드라이버를 사용함 (.config("spark.jars", ...)

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/usr/local/lib/python3.6/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
    .getOrCreate()

In [4]:
spark

# **SparkSQL 맛보기**

판다스로 일단 CSV 파일 하나 로드하기

In [5]:
import pandas as pd

namegender_pd = pd.read_csv("https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv")

In [6]:
namegender_pd.head()

Unnamed: 0,name,gender
0,Adaleigh,F
1,Amryn,Unisex
2,Apurva,Unisex
3,Aryion,M
4,Alixia,F


In [7]:
namegender_pd.groupby(["gender"]).count()

Unnamed: 0_level_0,name
gender,Unnamed: 1_level_1
F,65
M,28
Unisex,7


판다스 데이터프레임을 Spark 데이터프레임으로 변환하기

In [8]:
namegender_df = spark.createDataFrame(namegender_pd)

In [9]:
namegender_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)



In [10]:
namegender_df.show()  # Spark cluster위에 데이터가 있음, python단에서 보여주기 위해 -> 데이터 collect해서 보여줌

+----------+------+
|      name|gender|
+----------+------+
|  Adaleigh|     F|
|     Amryn|Unisex|
|    Apurva|Unisex|
|    Aryion|     M|
|    Alixia|     F|
|Alyssarose|     F|
|    Arvell|     M|
|     Aibel|     M|
|   Atiyyah|     F|
|     Adlie|     F|
|    Anyely|     F|
|    Aamoni|     F|
|     Ahman|     M|
|    Arlane|     F|
|   Armoney|     F|
|   Atzhiry|     F|
| Antonette|     F|
|   Akeelah|     F|
| Abdikadir|     M|
|    Arinze|     M|
+----------+------+
only showing top 20 rows



In [11]:
namegender_df.groupBy(["gender"]).count().collect()
# count까지만 하면 spark cluster에 새로운 dataframe 생성
# -> Python(local)에서 보여주기 위해 collect 함수 사용

[Row(gender='F', count=65),
 Row(gender='M', count=28),
 Row(gender='Unisex', count=7)]

In [12]:
# https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

데이터프레임을 테이블뷰로 만들어서 SparkSQL로 처리해보기

In [13]:
# df를 spark sql에서 쓰고 싶은 테이블 이름 설정
namegender_df.createOrReplaceTempView("namegender")

In [14]:
# spark df에서 sql문 실행해서 새로운 dataframe 생성
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")

In [15]:
namegender_group_df.collect()

[Row(gender='F', count(1)=65),
 Row(gender='M', count(1)=28),
 Row(gender='Unisex', count(1)=7)]

Redshift와 연결해서 테이블들을 데이터프레임으로 로딩하기

In [16]:
df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1!*") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

In [17]:
df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1!*") \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

In [18]:
df_user_session_channel.createOrReplaceTempView("user_session_channel")

In [19]:
df_session_timestamp.createOrReplaceTempView("session_timestamp")

In [20]:
channel_count_df = spark.sql("""
    SELECT channel, count(distinct userId) uniqueUsers
    FROM session_timestamp st
    JOIN user_session_channel usc ON st.sessionID = usc.sessionID
    GROUP BY 1
    ORDER BY 1
""")   # 채널별 유니크한 사용자 수

In [21]:
channel_count_df

DataFrame[channel: string, uniqueUsers: bigint]

In [22]:
channel_count_df.show()
# 오래걸림 b/c spark는 lazy evaluation임

+---------+-----------+
|  channel|uniqueUsers|
+---------+-----------+
| Facebook|        889|
|   Google|        893|
|Instagram|        895|
|    Naver|        882|
|  Organic|        895|
|  Youtube|        889|
+---------+-----------+



In [23]:
channel_with_o_count_df = spark.sql("""
    SELECT COUNT(1)
    FROM user_session_channel
    WHERE channel like '%o%'
""")

In [24]:
channel_with_o_count_df.collect()

[Row(count(1)=50864)]