\## PySpark Installation

In [None]:
!pip install pyspark==3.3.1 py4j==0.10.9.5

Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m18.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845500 sha256=e03c1a19d9514fd7442dc9871fc476d40eeeb1824f3d656b6a8a0c9f1159b258
  Stored in directory: /root/.cache/pip/wheels/0f/f0/3d/517368b8ce80486e84f89f214e0a022554e4ee64969f46279b
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstall

## Downalod data to be processed

In [None]:
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv

--2023-07-15 02:39:20--  https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv
Resolving s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)... 52.92.145.10, 52.92.128.66, 52.92.179.98, ...
Connecting to s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)|52.92.145.10|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 62728 (61K) [text/csv]
Saving to: ‘1800.csv’


2023-07-15 02:39:21 (387 KB/s) - ‘1800.csv’ saved [62728/62728]



In [None]:
!ls -tl

total 68
drwxr-xr-x 1 root root  4096 Jul 13 13:33 sample_data
-rw-r--r-- 1 root root 62728 Apr 10  2022 1800.csv


In [None]:
!head -5 1800.csv

ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,


## Process with Pandas data Frame

In [None]:
import pandas as pd

pd_df = pd.read_csv(
    "1800.csv",
    names=["stationID", "date", "measure_type", "temperature"],
    usecols=[0, 1, 2, 3]
)

In [None]:
pd_df.head()

Unnamed: 0,stationID,date,measure_type,temperature
0,ITE00100554,18000101,TMAX,-75
1,ITE00100554,18000101,TMIN,-148
2,GM000010962,18000101,PRCP,0
3,EZE00100082,18000101,TMAX,-86
4,EZE00100082,18000101,TMIN,-135


In [None]:
# Filter out all but TMIN entries
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]

In [None]:
pd_minTemps.head()

Unnamed: 0,stationID,date,measure_type,temperature
1,ITE00100554,18000101,TMIN,-148
4,EZE00100082,18000101,TMIN,-135
6,ITE00100554,18000102,TMIN,-125
9,EZE00100082,18000102,TMIN,-130
11,ITE00100554,18000103,TMIN,-46


In [None]:
# Select only stationID and temperature
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]

In [None]:
pd_stationTemps.head(10)

Unnamed: 0,stationID,temperature
1,ITE00100554,-148
4,EZE00100082,-135
6,ITE00100554,-125
9,EZE00100082,-130
11,ITE00100554,-46
14,EZE00100082,-73
16,ITE00100554,-13
19,EZE00100082,-74
21,ITE00100554,-6
24,EZE00100082,-58


In [None]:
# Aggregate to find minimum temperature for every station
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
pd_minTempsByStation.head()

Unnamed: 0_level_0,temperature
stationID,Unnamed: 1_level_1
EZE00100082,-135
ITE00100554,-148


In [None]:
"""
SELECT stationID, MIN(temperature)
FROM pd_df
WHERE measure_type = 'TMIN'
GROUP BY 1
"""

"\nSELECT stationID, MIN(temperature)\nFROM pd_df\nWHERE measure_type = 'TMIN'\nGROUP BY 1\n"

## Process with Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

In [None]:
df = spark.read.format("text").load("1800.csv") # spark.read.csv("1800.csv")

In [None]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
df = spark.read.format("csv")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")

In [None]:
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: string (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [None]:
df = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")

In [None]:
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [None]:
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField

schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])

In [None]:
# df = spark.read.schema(schema).format("csv").load("1800.csv")
df = spark.read.schema(schema).csv("1800.csv")

In [None]:
df.count()

1825

In [None]:
df.rdd.getNumPartitions()

1

In [None]:
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



In [None]:
# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")

In [None]:
minTemps.count()

730

In [None]:
# Column expression으로 필터링 적용
minTemps = df.where(df.measure_type == "TMIN")

In [None]:
minTemps.count()

730

In [None]:
# SQL expression으로 필터링 적용
minTemps = df.where("measure_type = 'TMIN'")

In [None]:
minTemps.count()

730

In [None]:
# Aggregate to find minimum temperature for every station
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+



In [None]:
# Select only stationID and temperature
stationTemps = minTemps[["stationID", "temperature"]]

In [None]:
stationTemps.show(5)

+-----------+-----------+
|  stationID|temperature|
+-----------+-----------+
|ITE00100554|     -148.0|
|EZE00100082|     -135.0|
|ITE00100554|     -125.0|
|EZE00100082|     -130.0|
|ITE00100554|      -46.0|
+-----------+-----------+
only showing top 5 rows



In [None]:
stationTemps = minTemps.select("stationID", "temperature")

In [None]:
stationTemps.show(5)

+-----------+-----------+
|  stationID|temperature|
+-----------+-----------+
|ITE00100554|     -148.0|
|EZE00100082|     -135.0|
|ITE00100554|     -125.0|
|EZE00100082|     -130.0|
|ITE00100554|      -46.0|
+-----------+-----------+
only showing top 5 rows



In [None]:
# Collect, format, and print the results
results = minTempsByStation.collect()

In [None]:
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

ITE00100554	-148.00F
EZE00100082	-135.00F


## Process with Spark SQL

In [None]:
df.show()

+-----------+--------+------------+-----------+
|  stationID|    date|measure_type|temperature|
+-----------+--------+------------+-----------+
|ITE00100554|18000101|        TMAX|      -75.0|
|ITE00100554|18000101|        TMIN|     -148.0|
|GM000010962|18000101|        PRCP|        0.0|
|EZE00100082|18000101|        TMAX|      -86.0|
|EZE00100082|18000101|        TMIN|     -135.0|
|ITE00100554|18000102|        TMAX|      -60.0|
|ITE00100554|18000102|        TMIN|     -125.0|
|GM000010962|18000102|        PRCP|        0.0|
|EZE00100082|18000102|        TMAX|      -44.0|
|EZE00100082|18000102|        TMIN|     -130.0|
|ITE00100554|18000103|        TMAX|      -23.0|
|ITE00100554|18000103|        TMIN|      -46.0|
|GM000010962|18000103|        PRCP|        4.0|
|EZE00100082|18000103|        TMAX|      -10.0|
|EZE00100082|18000103|        TMIN|      -73.0|
|ITE00100554|18000104|        TMAX|        0.0|
|ITE00100554|18000104|        TMIN|      -13.0|
|GM000010962|18000104|        PRCP|     

In [None]:
df.createOrReplaceTempView("station1800")

In [None]:
results = spark.sql("""SELECT stationID, MIN(temperature)
FROM station1800
WHERE measure_type = 'TMIN'
GROUP BY 1""")

In [None]:
results.show()

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+



In [None]:
driver_results = results.collect()
# pyspark.sql.Row는 DataFrame의 레코드에 해당하며 필드별로 이름이 존재#
for r in driver_results:
    print(r)

Row(stationID='ITE00100554', min(temperature)=-148.0)
Row(stationID='EZE00100082', min(temperature)=-135.0)
