#Getting Started with PySpark in Google Colab

PySpark adalah interface Python untuk Apache Spark. Penggunaan utama PySpark adalah untuk bekerja dengan data dalam Bigdata dan untuk membuat pipeline data.

Walalupun Apache Spark mendudukung Big Data, sebagai awal pembelajaran tidak perlu menggunakan data yang besar untuk mendapatkan manfaat dari PySpark. Kita bisa temukan bahwa SparkSQL adalah tools yang bagus untuk melakukan analisis data. Penggunaan library Panda menjadi lambat dengan data yang besar

Sumber tentang Apache Spark http://spark.apache.org/docs/latest/api/python/

# 1. Installing PySpark in Google Colab

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

Memulai Spark

In [None]:
spark

# 2. Reading Data

Sebagai contoh pembacaan sumber data, Kita akan menggunakan public  data set in dalam CSV format.
Sumber data:
https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv

In [None]:
import requests
path = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
req = requests.get(path)
url_content = req.content

csv_file_name = 'owid-covid-data.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df = spark.read.csv('/content/'+csv_file_name, header=True, inferSchema=True)

#3. PySpark DataFrames

**DataFrame** adalah struktur data dua dimensi dalam bahasa pemrograman komputer, mirip dengan tabel pada microsoft Excel. Pada pemrograman Python strukur data DataFrame adalah objek dalam pustaka panda.

In [None]:
#Viewing the dataframe schema
df.printSchema()

In [None]:
#Converting date column
df.select(F.to_date(df.date).alias('date'))

In [None]:
#Summary stats
df.describe().show()

In [None]:
#DataFrame Filtering
df.filter(df.location == "United States").orderBy(F.desc("date")).show()

In [None]:
#Simple Group by Function
df.groupBy("location").sum("new_cases").orderBy(F.desc("sum(new_cases)")).show(truncate=False)

# 4. Spark SQL

Salah satu kelebihan dari apache Spark adalah modul SQL, modul ini sangat memudahkan untuk berinteraksi dengan data menggunakan Spark.

In [None]:
#Creating a table from the dataframe
df.createOrReplaceTempView("covid_data") #temporary view
# df.saveAsTable("covid_data") #Save as a table
# df.write.mode("overwrite").saveAsTable("covid_data") #Save as table and overwrite table if exits

In [None]:

df2 = spark.sql("SELECT * from covid_data")
df2.printSchema()
df2.show()

In [None]:
groupDF = spark.sql("SELECT location, count(*) from covid_data group by location")
groupDF.show()

# 5. Tugas Penggunaan Data Set
Penggunaan data set Google Colab Session

https://github.com/rahmadsa/dataset/blob/main/sample_csv.csv

In [None]:
df = spark.read.csv("/sample_csv.csv", header=True, inferSchema=True)

In [None]:
df.printSchema()

In [None]:
#print N rows
df.show(5)

In [None]:
df.count()

17000

In [None]:
df.select("order_date","province").show(5)

In [None]:
df.describe().show()

In [None]:
df.select('item_price').distinct().show()

+-----------+
|total_rooms|
+-----------+
|      934.0|
|     3980.0|
|     4142.0|
|      596.0|
|     1761.0|
|     5983.0|
|     2815.0|
|     6433.0|
|      299.0|
|     2734.0|
|      769.0|
|     1051.0|
|     7554.0|
|     4066.0|
|     2862.0|
|     3597.0|
|      692.0|
|      720.0|
|     1765.0|
|     2523.0|
+-----------+
only showing top 20 rows



In [None]:
from pyspark.sql import functions as F
test = df.groupBy('item_price').agg(F.sum('city'))

In [None]:
test.toPandas()

In [None]:
#Counting and removing missing values

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

# Creating a Test Spark DataFrame

In [None]:
data = [
        ('John','Smith',1),
        ('Jane','Smith',2),
        ('Jonas','Smith',3),
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [None]:
df

# Spark Tips and Tricks

Contih code snippets for common or tricky penggunaan apache spark

##Pandas DataFrame to Spark DataFrame

In [None]:
import pandas as pd
import numpy as np

df = pd.DataFrame(np.random.randint(100,size=(1000, 3)),columns=['A','B','C'])
spark_df = spark.createDataFrame(df)
spark_df.show()

In [None]:
#Convert Object columns in pandas dataframe to a string
for i in df.select_dtypes(include='object').columns.tolist():
	df[i] = df[i].astype(str)

#Convert datetimes to UTC
  for i in [col for col in df.columns if df[col].dtype == 'datetime64[ns]']:
   df[i] = pd.to_datetime(df[i], utc=True)

#Replace nan and "None" in pandas dataframe to null in the spark dataframe
spark_df = spark.createDataFrame(df).replace('None', None).replace(float('nan'), None)

##Window Functions

In [None]:
data = [
        (1,'2021-01-01 10:00:00'),
        (1,'2021-01-01 11:00:00'),
        (1,'2021-01-01 12:00:00'),
        (2,'2021-01-01 12:00:00'),
        (2,'2021-01-01 13:00:00'),
        (2,'2021-01-01 14:00:00'),
]

columns = ["id","datetime"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

In [None]:
#Selecting the min and max by a specific Group
spark.sql('''
Select
  id,

  max(datetime) OVER (Partition BY id ORDER BY datetime) as max_date,
  min(datetime) OVER (Partition BY id ORDER BY datetime) as min_date,

  ROW_NUMBER() OVER (Partition BY id ORDER BY datetime) as row_number

  FROM window_test

''').show()

In [None]:
# Selecting the row number or order rank for each row within a specified grouping.
# This is great for sub rankings in a table

spark.sql('''
Select
  id,
  datetime,

  ROW_NUMBER() OVER (Partition BY id ORDER BY datetime) as row_number

  FROM window_test

''').show()

## De-duplicate data dalam sebuah row dataset menggunakan window function

In [None]:
data = [
        (1,'2021-01-01',100,'A'),
        (1,'2021-01-31',105,'A'),
        (2,'2021-02-04',160,'B'),
        (2,'2021-02-07',145,'B'),
]

columns = ["id","date","score","type"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

In [None]:
df2 = spark.sql("""
WITH T AS (
  SELECT
  *,
  ROW_NUMBER() OVER (PARTITION BY id ORDER BY date DESC) AS version_number
  FROM window_test
)

SELECT * FROM T WHERE version_number = 1;

""")

df2.show()

In [None]:
spark.sql("""
  SELECT
  *,
  SUM(score) OVER (PARTITION by type ORDER BY date) as score_cumulative
  FROM window_test

""").show()

+---+----------+-----+----+----------------+
| id|      date|score|type|score_cumulative|
+---+----------+-----+----+----------------+
|  1|2021-01-01|  100|   A|             100|
|  1|2021-01-31|  105|   A|             205|
|  2|2021-02-04|  160|   B|             160|
|  2|2021-02-07|  145|   B|             305|
+---+----------+-----+----+----------------+



## Limit results per group dengan window function

In [None]:
import pandas as pd
import numpy as np

df = pd.DataFrame(
np.hstack((
    np.random.randint(1,5,size=(100000, 1)),
    np.random.randint(100,size=(100000, 1))
))
, columns=['company_id', 'number'])

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("window_test_limits")


In [None]:
spark.sql("""
WITH T AS (
  SELECT
    company_id,
    number,
    ROW_NUMBER() OVER (PARTITION BY company_id ORDER BY number) AS row_number
  FROM window_test_limits
    )

SELECT * FROM T WHERE row_number <= 100

""").show()

+----------+------+----------+
|company_id|number|row_number|
+----------+------+----------+
|         1|     0|         1|
|         1|     0|         2|
|         1|     0|         3|
|         1|     0|         4|
|         1|     0|         5|
|         1|     0|         6|
|         1|     0|         7|
|         1|     0|         8|
|         1|     0|         9|
|         1|     0|        10|
|         1|     0|        11|
|         1|     0|        12|
|         1|     0|        13|
|         1|     0|        14|
|         1|     0|        15|
|         1|     0|        16|
|         1|     0|        17|
|         1|     0|        18|
|         1|     0|        19|
|         1|     0|        20|
+----------+------+----------+
only showing top 20 rows



## Kalkulasi 7 hari moving average

In [None]:
df = pd.DataFrame(pd.date_range('1/1/2022','1/31/2022',freq='D'), columns=['date'])
import random
df['company_id'] = 1
df['number'] = df.apply(lambda x: random.randint(0,100), axis = 1)

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("window_data")

dff.show()

In [None]:
spark.sql("""
SELECT
  date,
  company_id,
  number,
  AVG(number) OVER (PARTITION BY company_id ORDER BY date ASC RANGE BETWEEN INTERVAL 6 DAYS PRECEDING AND CURRENT ROW) as last_7_day_avg
FROM window_data
""").show()

## Monthly Active Users

In [None]:
import pandas as pd
df = pd.DataFrame(pd.date_range('1/1/2022','1/31/2022',freq='D'), columns=['login_date'])
import random
df['company_id'] = 1
df['user_id'] = df.apply(lambda x: random.randint(0,3), axis = 1)

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("users_data")

dff.show()

In [None]:
#Revisit this transform
spark.sql("""
SELECT
  login_date,
  COUNT(user_id) OVER (PARTITION BY login_date ORDER BY login_date ASC RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW) AS monthly_active_users
  FROM users_data
""").show()

## Mencari time difference antara dataset rows menggunakan a window function

In [None]:
data = [
        (1,'start','2021-01-01',100,'A'),
        (1,'end','2021-01-31',200,'A'),
        (2,'start','2021-03-05 4:53:11',100,'A'),
        (2,'end','2021-05-01 05:06:38',200,'A'),
]

columns = ["id","session","datetime","station_return","type"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

+---+-------+-------------------+--------------+----+
| id|session|           datetime|station_return|type|
+---+-------+-------------------+--------------+----+
|  1|  start|         2021-01-01|           100|   A|
|  1|    end|         2021-01-31|           200|   A|
|  2|  start| 2021-03-05 4:53:11|           100|   A|
|  2|    end|2021-05-01 05:06:38|           200|   A|
+---+-------+-------------------+--------------+----+



In [None]:
spark.sql('''
SELECT
  id,
  datetime,
  lead(datetime) OVER (PARTITION BY id ORDER BY datetime) as next_datetime,
  DATEDIFF(lead(datetime) OVER (PARTITION BY id ORDER BY datetime),datetime) as duration_in_days

FROM window_test

''').show()