<a href="https://colab.research.google.com/github/kgpark88/energy-bigdata-analysis/blob/main/spark_in_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  Colab에서 PySpark 사용하는 방법
- 참조 : https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/

## Java Virtual Machine (JVM) 설치

In [30]:
!apt-get install openjdk-8-jdk-headless 

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jdk-headless is already the newest version (8u312-b07-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 40 not upgraded.


## Apache Spark 설치

In [31]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz 

In [32]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [33]:
!ls -lt

total 506184
-rw-r--r--  1 root root  25525678 Apr 24 03:14 train.csv
drwxr-xr-x  3 root root      4096 Apr 24 03:13 drive
-rw-r--r--  1 root root  13832437 Apr 24 03:10 ngrok-stable-linux-amd64.zip
drwxr-xr-x  1 root root      4096 Apr 19 14:23 sample_data
-rw-r--r--  1 root root 224445805 May 24  2021 spark-3.1.2-bin-hadoop2.7.tgz
-rw-r--r--  1 root root 224445805 May 24  2021 spark-3.1.2-bin-hadoop2.7.tgz.1
drwxr-xr-x 13 1000 1000      4096 May 24  2021 spark-3.1.2-bin-hadoop2.7
-rwxr-xr-x  1 root root  30053267 May  4  2021 ngrok


##  findspark 라이브러리 설치

In [34]:
!pip install -q findspark

## 환경변수 설정

In [35]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

## 테스트

In [36]:
import findspark
findspark.init()

In [37]:
findspark.find()

'/content/spark-3.1.2-bin-hadoop2.7'

In [38]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [39]:
spark

## Spark UI

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

--2022-04-24 03:17:31--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 52.202.168.65, 54.161.241.46, 18.205.222.128, ...
Connecting to bin.equinox.io (bin.equinox.io)|52.202.168.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2022-04-24 03:17:32 (18.1 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [None]:
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

## 데이터 로딩
- 데이터 : https://www.kaggle.com/sdolezel/black-friday

In [None]:
df = spark.read.csv("train.csv", header=True, inferSchema=True)

In [None]:
df.printSchema()

In [None]:
df.show(5)

In [None]:
df.count()

## PySpark 사용법

### 특정 컬럼 데이터 보기

In [None]:
df.select("User_ID","Gender","Age","Occupation").show(5)

### 컬럼

In [None]:
df.describe().show()

### 범주형 컬럼(Categorical columns)의 유일값 확인

In [None]:
df.select("City_Category").distinct().show()

### Groupby 집계


In [None]:
from pyspark.sql import functions as F
df.groupBy("City_Category").agg(F.sum("Purchase")).show()

### Null value 확인 및 처리

In [None]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:
df = df.fillna({'Product_Category_2':0, 'Product_Category_3':0})

In [None]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

## 데이터 저장

In [None]:
df.write.csv("/content/drive/MyDrive/preprocessed_data")

In [None]:
df.rdd.getNumPartitions()

## Pandas 데이터프레임으로 저장

In [None]:
df_pd = df.toPandas()
df_pd.to_csv("/content/drive/MyDrive/pandas_preprocessed_data.csv")