#  Colab에서 PySpark 사용하는 방법
- 참조 : https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/

## Java Virtual Machine (JVM) 설치

In [3]:
!apt-get install openjdk-8-jdk-headless 

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 39 not upgraded.


## Apache Spark 설치

In [18]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz 

In [22]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [23]:
!ls -lt

total 219200
drwx------  5 root root      4096 Jun  4 15:33 drive
drwxr-xr-x  1 root root      4096 Jun  1 13:40 sample_data
-rw-r--r--  1 root root 224445805 May 24 05:01 spark-3.1.2-bin-hadoop2.7.tgz
drwxr-xr-x 13 1000 1000      4096 May 24 05:00 spark-3.1.2-bin-hadoop2.7


##  findspark 라이브러리 설치

In [25]:
!pip install -q findspark

## 환경변수 설정

In [28]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

## 테스트

In [29]:
import findspark
findspark.init()

In [30]:
findspark.find()

'/content/spark-3.1.2-bin-hadoop2.7'

In [33]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [34]:
spark

## Spark UI

In [35]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

--2021-06-04 15:44:13--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 3.227.65.201, 34.196.68.240, 54.204.235.80, ...
Connecting to bin.equinox.io (bin.equinox.io)|3.227.65.201|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-06-04 15:44:14 (36.4 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
{"tunnels":[],"uri":"/api/tunnels"}


In [36]:
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://640fa42e5f9e.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://640fa42e5f9e.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


## 데이터 로딩
- 데이터 : https://www.kaggle.com/sdolezel/black-friday

In [38]:
df = spark.read.csv("train.csv", header=True, inferSchema=True)

In [39]:
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [40]:
df.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              null|              null|    1422|
|100

In [41]:
df.count()

519514

## PySpark 사용법

### 특정 컬럼 데이터 보기

In [42]:
df.select("User_ID","Gender","Age","Occupation").show(5)

+-------+------+----+----------+
|User_ID|Gender| Age|Occupation|
+-------+------+----+----------+
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000002|     M| 55+|        16|
+-------+------+----+----------+
only showing top 5 rows



### 컬럼

In [43]:
df.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            519514|    519514|519514|519514|           519514|       519514|                    519514|             519514|            519514|            358129|            158717|           519514|
|   mean|1002977.6155849505|      null|  null|  null|8.081037277147487|         null|        1.4688874978660444|0.40954622974549293|  5.29633657610767| 9.84075291305646

### 범주형 컬럼(Categorical columns)의 유일값 확인

In [44]:
df.select("City_Category").distinct().show()

+-------------+
|City_Category|
+-------------+
|            B|
|            C|
|            A|
+-------------+



### Groupby 집계


In [45]:
from pyspark.sql import functions as F
df.groupBy("City_Category").agg(F.sum("Purchase")).show()

+-------------+-------------+
|City_Category|sum(Purchase)|
+-------------+-------------+
|            B|   2010372182|
|            C|   1582187796|
|            A|   1255290136|
+-------------+-------------+



### Null value 확인 및 처리

In [46]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|            161385|            360797|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [47]:
df = df.fillna({'Product_Category_2':0, 'Product_Category_3':0})

In [48]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|                 0|                 0|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



## 데이터 저장

In [51]:
df.write.csv("/content/drive/MyDrive/preprocessed_data")

AnalysisException: ignored

In [50]:
df.rdd.getNumPartitions()

1

## Pandas 데이터프레임으로 저장

In [52]:
df_pd = df.toPandas()
df_pd.to_csv("/content/drive/MyDrive/pandas_preprocessed_data.csv")