<a href="https://colab.research.google.com/github/kingodjerry/spark_study/blob/main/spark(pyspark).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## PySpark 설치
Spark session은 Spark 2.0부터 엔트리 포인트로 사용됨   
Spark session을 이용해 RDD, 데이터 프레임 등을 만듦   
Spark session은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정 가능   
(3.5.3에서는 py4j를 추가 설치하지 않아도 됨)

- local[*] Spark이 하나의 JVM으로 동작하고 그 안에 컴퓨터의 코어 수 만큼의 스레드가 Executoer로 동작함

In [1]:
!pip install pyspark==3.5.3



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("PySpark Tutorial")\
        .getOrCreate()

In [4]:
spark

In [5]:
!lscpu

Architecture:             x86_64
  CPU op-mode(s):         32-bit, 64-bit
  Address sizes:          46 bits physical, 48 bits virtual
  Byte Order:             Little Endian
CPU(s):                   2
  On-line CPU(s) list:    0,1
Vendor ID:                GenuineIntel
  Model name:             Intel(R) Xeon(R) CPU @ 2.20GHz
    CPU family:           6
    Model:                79
    Thread(s) per core:   2
    Core(s) per socket:   1
    Socket(s):            1
    Stepping:             0
    BogoMIPS:             4399.99
    Flags:                fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 cl
                          flush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc re
                          p_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3
                           fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand
                           hypervisor lahf_lm abm 3dnowprefetch i

In [6]:
!grep MemTotal /proc/meminfo

MemTotal:       13290460 kB


## 헤더가 없는 CSV 파일 처리하기
data : https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv   
❖ 데이터에 스키마 지정하기   
❖ SparkConf 사용해보기   
❖ measure_type값이 TMIN인 레코드 대상으로 stationId별 최소 온도 찾기   

In [7]:
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv

--2024-11-30 11:30:46--  https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv
Resolving s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)... 3.5.83.12, 52.92.207.154, 3.5.76.4, ...
Connecting to s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)|3.5.83.12|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 62728 (61K) [text/csv]
Saving to: ‘1800.csv’


2024-11-30 11:30:47 (226 KB/s) - ‘1800.csv’ saved [62728/62728]



In [8]:
!ls -tl

total 68
drwxr-xr-x 1 root root  4096 Nov 25 19:13 sample_data
-rw-r--r-- 1 root root 62728 Apr 10  2022 1800.csv


In [9]:
!head -5 1800.csv

ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,


### pandas df로 처리해보고 비교하기

In [10]:
import pandas as pd

pd_df = pd.read_csv(
    "1800.csv",
    names = ["stationID", "date", "measure_type", "temperature"],
    usecols = [0, 1, 2, 3]
)

In [11]:
pd_df.head()

Unnamed: 0,stationID,date,measure_type,temperature
0,ITE00100554,18000101,TMAX,-75
1,ITE00100554,18000101,TMIN,-148
2,GM000010962,18000101,PRCP,0
3,EZE00100082,18000101,TMAX,-86
4,EZE00100082,18000101,TMIN,-135


In [12]:
# TMIN인 것으로 필터 적용
pd_minTemps = pd_df[pd_df["measure_type"] == "TMIN"]

In [13]:
pd_minTemps.head()

Unnamed: 0,stationID,date,measure_type,temperature
1,ITE00100554,18000101,TMIN,-148
4,EZE00100082,18000101,TMIN,-135
6,ITE00100554,18000102,TMIN,-125
9,EZE00100082,18000102,TMIN,-130
11,ITE00100554,18000103,TMIN,-46


In [14]:
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]

In [15]:
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
pd_minTempsByStation.head()

Unnamed: 0_level_0,temperature
stationID,Unnamed: 1_level_1
EZE00100082,-135
ITE00100554,-148


== TMIN이라는 타입을 갖는 stationID는 2개뿐임
### Spark으로 처리해보기

In [17]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

In [18]:
df = spark.read.format("csv").load("1800.csv") # spark.read.csv("1800.csv")

In [19]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [20]:
df = spark.read.format("csv")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")

In [21]:
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: string (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [22]:
df = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")

In [23]:
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [27]:
# 명시적으로 타입 지정
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField

schema = StructType([ \
                      StructField("stationID", StringType(), True), \
                      StructField("date", IntegerType(), True), \
                      StructField("measure_type", StringType(), True), \
                      StructField("temperature", FloatType(), True)
                    ])

In [25]:
df = spark.read.schema(schema).csv("1800.csv") # df = spark.read.schema(schema).format("csv").load("1800.csv")

In [26]:
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



In [29]:
# filter
minTemps = df.filter(df.measure_type == "TMIN")

In [30]:
minTemps.count()

730

In [34]:
minTemps = df.where("measure_type = 'TMIN'") # minTemps = df.where(df.measure_type == "TMIN")

In [35]:
minTemps.count()

730

In [39]:
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
minTempsByStation.show() # 최대 20개까지 보여줌

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+



In [37]:
stationTemps = minTemps[["stationID", "temperature"]] # minTemp.select("stationID", "temperature")

In [38]:
stationTemps.show(5)

+-----------+-----------+
|  stationID|temperature|
+-----------+-----------+
|ITE00100554|     -148.0|
|EZE00100082|     -135.0|
|ITE00100554|     -125.0|
|EZE00100082|     -130.0|
|ITE00100554|      -46.0|
+-----------+-----------+
only showing top 5 rows



In [40]:
# collect, format, print results
results = minTempsByStation.collect()

In [41]:
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

ITE00100554	-148.00F
EZE00100082	-135.00F


### Spark SQL로 처리해보기

In [42]:
df.createOrReplaceTempView("station1800")

In [44]:
results = spark.sql("""SELECT stationID, MIN(temperature)
                        FROM station1800
                        WHERE measure_type = 'TMIN'
                        GROUP BY 1""").collect()

In [45]:
for r in results:
  print(r)

Row(stationID='ITE00100554', min(temperature)=-148.0)
Row(stationID='EZE00100082', min(temperature)=-135.0)
