<a href="https://colab.research.google.com/github/learn-programmers/programmers_kdt_II/blob/main/9%EC%A3%BC%EC%B0%A8_PySpark_%EA%B8%B0%EB%B3%B8_3%EC%9D%BC%EC%B0%A8_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다.

이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Collecting pyspark==3.0.1
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 43.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=0ee42e32413b5e1fa6653634c23c918d8652ba953ee9d4b41047acfa1832cc81
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0

지금부터 실습은 Redshift에 있는 데이터를 가지고 해볼 예정이고 그래서 Redshift 관련 JAR 파일을 설치해야함

In [2]:
!cd /usr/local/lib/python3.6/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

--2021-02-04 14:12:13--  https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.243.110
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.243.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2413910 (2.3M) [application/java-archive]
Saving to: ‘RedshiftJDBC42-no-awssdk-1.2.20.1043.jar’


2021-02-04 14:12:13 (5.24 MB/s) - ‘RedshiftJDBC42-no-awssdk-1.2.20.1043.jar’ saved [2413910/2413910]



**Spark Session:** 이번 SparkSession은 spark.jars를 통해 앞서 다운로드받은 Redshift 연결을 위한 JDBC 드라이버를 사용함 (.config("spark.jars", ...)

In [3]:
from pyspark.sql import SparkSession
# config가 들어간 것 외에는 이전에 스파크 세션 만드는 것과 동일하다.

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/usr/local/lib/python3.6/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
    .getOrCreate()

In [4]:
spark

# **SparkSQL 맛보기**

판다스로 일단 CSV 파일 하나 로드하기

In [5]:
import pandas as pd

namegender_pd = pd.read_csv("https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv")

In [6]:
namegender_pd.head() # 파일에 뭐가 있는지 레코드를 살펴보기
# 어떤 이름은 여자가 쓰고, 어떤 이름은 남자가 쓰고, 어떤 이름은 남녀 상관없이 쓰는지

Unnamed: 0,name,gender
0,Adaleigh,F
1,Amryn,Unisex
2,Apurva,Unisex
3,Aryion,M
4,Alixia,F


In [7]:
namegender_pd.groupby(["gender"]).count() # 성별 별로 이름 카운트

Unnamed: 0_level_0,name
gender,Unnamed: 1_level_1
F,65
M,28
Unisex,7


판다스 데이터프레임을 Spark 데이터프레임으로 변환하기

In [8]:
# 이를 spark위의 rdd나 데이터프레임으로 사용하기 위해 변환.
# 판다스 데이터프레임이 스파크 데이터프레임으로 바뀜.
namegender_df = spark.createDataFrame(namegender_pd)

In [10]:
namegender_df.printSchema() # 스키마(구조) 출력
# 스파크 데이터 프레임으로 바뀌면서 두개의 필드를 갖는 데이터로 바뀜(이름, 성별)

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)



In [12]:
namegender_df.show() # 스파크 데이터프레임으로 바뀐 레코드들을 출력해보자.
# 이 뒷단에서는 데이터를 collect한 뒤 보여주고 있다.
# 스파크 데이터프레임은 로컬 환경이 아닌 스파크 클러스터 위에 있다.
# 따라서 파이썬 드라이버로 끌어온 뒤 보여줘야 하기 때문에, 뒤에서 collect한 뒤 show가 되고 있는 것.


+----------+------+
|      name|gender|
+----------+------+
|  Adaleigh|     F|
|     Amryn|Unisex|
|    Apurva|Unisex|
|    Aryion|     M|
|    Alixia|     F|
|Alyssarose|     F|
|    Arvell|     M|
|     Aibel|     M|
|   Atiyyah|     F|
|     Adlie|     F|
|    Anyely|     F|
|    Aamoni|     F|
|     Ahman|     M|
|    Arlane|     F|
|   Armoney|     F|
|   Atzhiry|     F|
| Antonette|     F|
|   Akeelah|     F|
| Abdikadir|     M|
|    Arinze|     M|
+----------+------+
only showing top 20 rows



In [11]:
# namegender_df는 스파크 데이터프레임이다.
# 스파크 데이터프레임의 연산을 써서 성별에 따라 몇개의 이름이 있는지 카운트해보자.
# namegender_df.groupBy(["gender"]).count()라고만 하면 새로운 데이터 프레임이 만들어질 뿐, 스파크 클러스터에 있어 로컬에서 보이지 않는다.
# 로컬에서 보이게 하려면 .collect()를 붙여 로컬의 파이썬 환경으로 불러와야 한다.
namegender_df.groupBy(["gender"]).count().collect()

[Row(gender='F', count=65),
 Row(gender='M', count=28),
 Row(gender='Unisex', count=7)]

In [None]:
# https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

데이터프레임을 테이블뷰로 만들어서 SparkSQL로 처리해보기

In [13]:
# 데이터프레임에 테이블 이름을 준뒤 스파크sql로 처리해보자.
# namegender_df라는 스파크 데이터 프레임을 namegender라는 이름의 테이블로 엑세스할 수 있게 한다.
namegender_df.createOrReplaceTempView("namegender")

In [14]:
# spark.sql을 통해 다양한 sql문을 실행할 수 있다.
# GROUP BY 1이므로 SELECT뒤의 첫번째 필드인 gender를 기준으로 레코드들을 그룹핑 한 뒤,
# 동일한 값을 갖는 그룹 내의 레코드들을 카운트한다.
# 그 결과가 새로운 스파크 데이터 프레임인 namegender_group_df로 만들어진다.
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")

In [15]:
# 위의 결과를 collect로 파이썬 환경으로 받아와서 프린트한다.
# namegender_df.groupBy(["gender"]).count().collect()의 결과와 동일하다.
namegender_group_df.collect()

[Row(gender='F', count(1)=65),
 Row(gender='M', count(1)=28),
 Row(gender='Unisex', count(1)=7)]

Redshift와 연결해서 테이블들을 데이터프레임으로 로딩하기

In [16]:
# Redshift와 연결해서 테이블들을 데이터프레임으로 로딩하고
# 그 데이터프레임들에게 테이블이름을 줘서 스파크 sql로 처리해볼 것이다.
# raw_data.user_session_channel 테이블을 df_user_session_channel라는 데이터프레임으로 스파크에서 로딩한다.
# 엑세스할때 유저아이디, 패스워드, 필요한 드라이버 쓰는것 필수!
# 드라이버는 앞에서 지정한 jar 파일 안에 들어가있다.
df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/prod?user=guest&password=Guest1!*") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

In [17]:
# raw_data.session_timestamp라는 RedShift의 테이블을 df_session_timestamp라는 데이터프레임으로 받아온다.
# 엑세스할때 유저아이디, 패스워드, 필요한 드라이버 쓰는것 필수!
# 드라이버는 앞에서 지정한 jar 파일 안에 들어가있다.
df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/prod?user=guest&password=Guest1!*") \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

In [18]:
# df_user_session_channel 데이터 프레임은 앞으로 spark sql에서 
# user_session_channe라는 이름으로 지칭한다.
df_user_session_channel.createOrReplaceTempView("user_session_channel")

In [19]:
# df_session_timestamp 데이터 프레임은 앞으로 spark sql에서 
# session_timestamp라는 이름으로 지칭한다.
df_session_timestamp.createOrReplaceTempView("session_timestamp")

In [20]:
# sql만 써서 데이터 프로세싱을 할 수 있다.
# 두개의 테이블을 inner join해서 sessionID필드로 매칭한다.
# 두 테이블에 sessionID가 매칭이 되는 것들만 리턴이 된다.
# 채널로 그룹핑을 한 뒤, 같은 채널 안의 레코드 중 userId의 값이 다른 것들만 리턴한다.
# 결과적으로 채널별로 유니크한 사용자 수가 카운트 되고, ORDER BY 1에 의해
# 채널의 오름차순 순으로 select된 레코드들을 보여준다.
# 이것이 새로운 데이터 프레임 channel_count_df이 된다.
# count(distinct userID)뒤에 uniqueUsers를 붙임으로써 필드의 이름을 지정할 수 있다.
channel_count_df = spark.sql("""
    SELECT channel, count(distinct userId) uniqueUsers
    FROM session_timestamp st
    JOIN user_session_channel usc ON st.sessionID = usc.sessionID
    GROUP BY 1
    ORDER BY 1
""")

In [22]:
channel_count_df # 스파크 클러스터 위에 있기 때문에 아직 별다른 정보가 없다.
# 두개의 필드가 있음을 알 수 있다.

DataFrame[channel: string, uniqueUsers: bigint]

In [23]:
channel_count_df.show() 
# channel_count_df에 어떤 내용들이 있는지 본다.
# 스파크는 lazy exacution이다. 
# 데이터 프레임으로 최종적으로 뭘 하기 전까지 스파크는 앞의 sql구문들을 실행하지 않는다.
# 따라서 이 .show()를 할 때 비로소 앞의 것들을 실행하기 때문에 좀 오래걸린다.
# show를 쓰면 뒷단에서 데이터프레임을 collect라는 함수로 결과를 파이썬 드라이버 프로그램 단으로 끌어온다.


+---------+-----------+
|  channel|uniqueUsers|
+---------+-----------+
| Facebook|        889|
|   Google|        893|
|Instagram|        895|
|    Naver|        882|
|  Organic|        895|
|  Youtube|        889|
+---------+-----------+



In [24]:
# like로 대소문자 구별해 문자 매칭
# user_session_channel 테이블의 레코드들을 읽어오는데
# 채널 이름에 소문자 o가 있는 것만 카운트한다.
# 이것을 channel_with_o_count_df라는 데이터프레임으로 저장한다.
channel_with_o_count_df = spark.sql("""
    SELECT COUNT(1)
    FROM user_session_channel
    WHERE channel like '%o%'
""")

In [25]:
# show를 할 수도 있지만 collect로 한번 불러보자.
# 채널 이름에 o가 들어가는 레코드는 50864개임을 알 수 있다.
channel_with_o_count_df.collect()

[Row(count(1)=50864)]

지금까지 spark에서 데이터 조작하는 방법으로 
1. spark 상의 데이터 프레임으로 바꾼 뒤 데이터 프레임에 다양한 함수들을(count등) 적용하는 것.
2. spark sql을 이용해 처리하는 것.(테이블을 만들어서)
데이터 조작에 최적화된 sql을 사용하는 2번이 더 좋다. 