In [None]:
!pip install pyspark

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row, Column
from pyspark.sql import functions as f
from pyspark.sql.functions import pandas_udf, expr
import pyspark.pandas as ps

import os, sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import easydict

from typing import *
from tqdm import tqdm

os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'



# __PySpark__

|Numpy|PySpark|
|---|---|
|np.int8|ByteType|
|np.int16|ShortType|
|np.int32|IntegerType|
|np.int64|LongType|
|np.float32|FloatType|
|np.float64|DoubleType|
|np.str|StringType|

In [None]:
# DataFrame을 다루기위한 SparkSession 생성
spark = (SparkSession
         .builder
         .master('local')
         .appName('Mata')
         .config(conf=SparkConf())
         .getOrCreate())
spark

### Data load, save

In [None]:
# df = pd.DataFrame()               # pandas.core.frame.DataFrame
# pdf = spark.createDataFrame(df)   # pyspark.sql.dataframe.DataFrame
# psdf = ps.from_pandas(df)         # pyspark.pandas.frame.DataFrame
# pdf = psdf.to_spark()
# psdf = pdf.pandas_api()
# df = pdf.toPandas()
# df = psdf.to_pandas()
pdf.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)



In [None]:
df = (spark
      .read
      .option('inferSchema', True)
      .option('header', True)
      .csv('/raw_dataset.csv')
      .limit(10_000)))

In [None]:
pdf = spark.read.csv('foo.csv', header=True, inferSchema=True)
pdf.write.csv('foo.csv', header=True)

pdf = spark.read.parquet('bar.parquet')
pdf.write.parquet('bar.parquet')

psdf = ps.read_csv('foo.csv')
psdf.to_csv('foo.csv')

psdf = ps.read_parquet('bar.parquet')
psdf.to_parquet('bar.parquet')

In [None]:
import json

json_dict = [
    {'age': None, 'name': 'Michael'},
    {'age': 30, 'name': 'Andy'},
    {'age': 19, 'name': 'Justin'},
]

with open('info.json', 'w') as json_file:
    json.dump(json_dict, json_file)

pdf = spark.read.json('info.json')
psdf = ps.read_json('info.json')



## Spark SQL
`SQL`을 Spark 함수로 정의

### CoreClasses

In [None]:
# withColumn
# withColumn(colname, col): 새로운 column 추가
pdf = pdf.withColumn('new_a', F.abs(pdf.a))

DataFrame[a: bigint, b: double, c: string, new_a: bigint]

In [None]:
# cast, alias, asc, desc
# cast(type): 타입 변경, alias(colname): 변수명 변경
(1 / raw['q_seq']).cast('double').asc().alias('sorted_q_seq')
df.select(col('q_seq').alias('concept_count'), col('rightFlag'))

Column<'(1 / conceptCount) ASC NULLS FIRST AS sorted_conceptCount'>

In [None]:
# 전체 변수명 변경
new_colnames = []
df = df.toDF(*new_colnames)

# 부분 변수명 변경
df = df.withColumnRenamed('old_colname', 'new_colname')

In [None]:
# select, filter, between, isin, contains, isNull, isNotNull
# select: 조건에 맞는 열 선택, filter: 조건에 맞는 행 선택
raw.select(raw.q_seq, raw.q_seq.between(10, 12))         # 10~12 리턴
raw.select(raw.q_seq, raw.q_seq.isin([10, 11, 12]))      # 10~12 리턴
raw.select(raw.q_seq, raw.q_seq.contains('c'))           # 'c'가 포함되면 리턴

DataFrame[q_seq: int, ((conceptCount >= 10) AND (conceptCount <= 12)): boolean]

In [None]:
# like, startswith, substr
raw.filter(raw.ev_date.like('2022-10%'))
raw.filter(raw.ev_date.startswith('2022-10'))
raw.select(raw.ev_date.substr(1, 4)).alias('year')

In [None]:
# when
# select와 조건 함수 when을 이용한 변수 생성 (=SELECT, CASE WHEN)
raw.select(raw.expHeight, F.when(raw.expHeight > 2000, 0).when(raw.expHeight < 1000, 1).otherwise(0)).limit(3).show()

+---------+-----------------------------------------------------------------------------+
|expHeight|CASE WHEN (expHeight > 2000) THEN 0 WHEN (expHeight < 1000) THEN 1 ELSE 0 END|
+---------+-----------------------------------------------------------------------------+
|      100|                                                                            1|
|      116|                                                                            1|
|      192|                                                                            1|
+---------+-----------------------------------------------------------------------------+



### DataFrame

In [None]:
# collect, dtypes
# collect(): 데이터를 Row 형태로 리턴
raw.collect()
raw.dtypes
raw.schema

In [None]:
# drop, drop_duplicates
raw.drop('q_seq')
raw.drop_duplicates(['ex_seq', 'q_seq'])

In [None]:
# dropna, na.drop, fillna, na.fill
raw.dropna(how='any')   # how='all': 모두 nan인 경우만 drop
raw.na.drop()
raw.fillna(0)
raw.na.fill(0)
raw.na.fill({'rightFlag': 0, 'expHeight': 1000})

In [None]:
# filter, select
# raw.filter(): 조건에 맞는 행 선택
raw.filter(raw.expHeight > 0)
raw.filter('expHeight > 0')

# raw.select(): 조건에 투영하여(project) 새로운 DataFrame 리턴
raw.select('*').collect()
raw.select('ex_seq', ('q_seq' + 10).alias('new_q'))

##### __transform__

In [None]:
# transform
# transform(func, *args, *kwargs): df를 리턴하는 func을 받아 새로운 DataFrame 생성
def cast_all_to_int(input_df):
    return input_df.select([F.col(col_name).cast('int') for col_name in input_df.columns])

def sort_columns_asc(input_df):
    return input_df.select(*sorted(input_df.columns))

df = spark.createDataFrame([(1, 1.0), (2, 2.0)], ['int', 'float'])
df.transform(cast_all_to_int).transform(sort_columns_asc).show()

In [None]:
# join
raw.join(raw.rightFlag, on='seqNo', how='inner').select('ex_seq', 'q_seq').sort(F.desc('q_seq'))

In [None]:
# show, limit, first, head, tail, last
raw.show()
raw.limit()
raw.first()
raw.head()
raw.tail()

In [None]:
# sort
raw.sort('seqNo', ascending=False)

In [None]:
# describe, summary
# summary: describe에 percentile 추가됨 - summary val. 선택도 가능
raw.describe(['expHeight']).show()
raw.select('expHeight').summary().show()

+-------+------------------+
|summary|         expHeight|
+-------+------------------+
|  count|           6366626|
|   mean| 393.4629398679929|
| stddev|357.69690700290295|
|    min|                -2|
|    max|              5214|
+-------+------------------+



#### Groupping

`pandas_udf` pandas user defined function: transfer data and Pandas to work with the data, which `allows vectorized operations`

`Pandas UDF` behaves as a regular `PySpark function API` in general

* `apply`는 `@pandas_udf(input_type)`을 함수위에 설정해야 함
* `applyInPandas`는 @pandas_udf(input_type)을 설정할 필요 없지만 `schema`를 설정해야함
* `spark.pandas`를 사용하는 경우 udf나 schema를 설정할 필요 없음

In [None]:
# groupby, agg + count, mean, avg, min, max, sum
raw.groupby(['qType', 'imageAudit']).count().collect()
raw.groupby('qType').agg({'expHeight': 'mean', 'rightFlag': 'sum'})
raw.groupby(raw.qType).agg({'*': 'count'})

In [None]:
@pandas_udf('long')  # 사용하는 데이터의 input type을 명시, 복수의 경우 'col1 type1, col2 type2'
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], schema=("id", "v"))
df.groupby('id').agg(mean_udf(df['v']).alias('mean_v')).show()

+---+------+
| id|mean_v|
+---+------+
|  1|     1|
|  2|     6|
+---+------+



In [None]:
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v-v.mean()) / v.std())

df.groupby('id').applyInPandas(
    normalize, schema='id long, v double').show()

+---+-------------------+
| id|                  v|
+---+-------------------+
|  1|-0.7071067811865475|
|  1| 0.7071067811865475|
|  2|-0.8320502943378437|
|  2|-0.2773500981126146|
|  2| 1.1094003924504583|
+---+-------------------+



In [None]:
@pandas_udf('first string, last string')
def split_expand(s: pd.Series) -> pd.DataFrame:
    return s.str.split(expand=True)

df = spark.createDataFrame([('John Doe',)], ('name',))
df.select(split_expand('name')).show()

+------------------+
|split_expand(name)|
+------------------+
|       {John, Doe}|
+------------------+



### Functions

df.isna().count()와 같이 사용하던 대부분의 `method`들을 `함수형`으로 바꿔놓음

In [None]:
# when, isnan, isnull, abs, sqrt
# .select() 내에서 변수를 생성할 때 많이 사용함

In [None]:
df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
df.select(F.isnan('a').alias('r1'), F.isnan(df.a).alias('r2')).collect()
df.select(F.isnull('a').alias('r1'), F.isnull(df.a).alias('r2')).collect()
df.select(F.abs('a').alias('abs_a'), F.sqrt('b').alias('sqrt_b')).show()

raw.select(raw.expHeight, F.when(raw.expHeight > 2000, 0).when(raw.expHeight < 1000, 1).otherwise(0)).limit(3).show()

[Row(r1=False, r2=False), Row(r1=False, r2=False)]

In [None]:
# transform
df = spark.createDataFrame([(1, [1, 2, 3, 4]), (2, [1, 3, 5, 7])], ("key", "values"))
df.select('key', F.transform('values', lambda x: x*2).alias('doubled')).show()

def alternate(x, i):
    return F.when(i%2 == 0, x).otherwise(-x)
df.select(F.transform('values', alternate).alias('alternated')).show()

+---+--------------+
|key|       doubled|
+---+--------------+
|  1|  [2, 4, 6, 8]|
|  2|[2, 6, 10, 14]|
+---+--------------+



### Example

In [None]:
df.select(
    [count(
        when(
            col(c).contains('None') | col(c).contains('NULL') | col(c) == '' | isnan(c), c
        )
    ).alias(c) for c in df.columns])

In [None]:
(df
 .withColumn('new_col', when(df.col == 'M', 'Male')
 .when(df.col == 'F', 'Female')
 .when(df.col.isnull(), "") # isNull()
 .otherwise(df.gender)))

## Pandas API
`pandas`의 함수 대부분을 사용하여 PySpark를 사용할 수 있음

In [None]:
psdf = ps.from_pandas(df)
psdf = pdf.pandas_api(pdf)

pyspark.sql.dataframe.DataFrame

## Working with SQL

In [None]:
# SQL문법으로 사용할 수 있는 테이블 생성
df.createOrReplaceTempView('table')

# 저장된 테이블 확인
print(spark.catalog.listTables())

In [None]:
df.createOrReplaceTempView('table')
spark.sql('SELECT count(*) from table').show()

In [None]:
@pandas_udf('integer')
def add_one(s: pd.Series) -> pd.Series:
    return s+1

spark.udf.register('add_one', add_one)
spark.sql('SELECT add_one(v1) FROM table').show()

# PySpark ML

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.ml.recommendation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassification

In [None]:
# VectorAssembler: 데이터를 하나의 열로 압축 (ex) col1, col2, col3 -> [col1, col2, col3]
required_features = ['aid', 'rank']
assembler = VectorAssembler(inputCols=required_features, outputCol='features')
data = assembler.transform(df)

In [None]:
splits = data.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1].withColumn('label', 'trueLabel')

train_rows = train.count()
test_rows = test.count()
print('Training Rows:', train_rows, 'Test Rows:', test_rows)
train.select('features').show(truncate=False)

In [None]:
lr = LogisticRegression(labelCol='target', featurecols='features', maxiter=10, regParam=0.3)
rf = RandomForestClassifier(labelCol='target', featurecols='features', maxDepth=5)

In [None]:
model = rf.fit(train)
pred_train = model.transform(train)
pred_test = model.transform(test)
pred_test.select('prediction').show()

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol='quality',
    predictionCol='prediction',
    metricNmae='accuracy'
)

acc = evaluator.evaluate(pred_train)
print('Train accuracy = ', acc)

acc = evaluator.evaluate(pred_test)
print('Test accuracy = ', acc)

In [None]:
from pyspark.ml.stat import Correlation

corr_mat = Correlation.corr(data, 'corr_vars').collect()[0][0].toArray().tolist()
corr_mat_df = spark.createDataFrame(corr_mat, schema=df.columns)
corr_mat_df.show()

In [None]:
plot_corr = corr_mat_df.toPandas()
plot_corr.index = corr_mat_df.columns
plot_corr.stype.background_gradient(cmap='Blues')

# Window

In [None]:
from pyspark.sql import Window
from pyspark.sql import Row

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
seq = [
    (1,"chip",1),
    (2,"drink",2),
    (3,"chip",2),
    (4,"fish",1),
    (5,"drink",3),
    (6,"other",5),
    (7,"drink",1),
    (8,"fish",4),
    (9,"other",1),
    (10,"other",6),
    (11,"drink",5),
    (12,"fish",7)
]

df = spark.createDataFrame(seq).toDF("id", "product", "number")

In [None]:
# Window.unboundedPreceding: 가장 작은 값의 row부터
# Window.currentRow: 현재까지 -> 누적
window1 = Window.partitionBy('product')
window2 = Window.partitionBy('product').orderBy('id').rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [None]:
(df
 .withColumn('min', f.min('number').over(window1))
 .withColumn('max', f.max('number').over(window1))
 .withColumn('avg', f.avg('number').over(window1))
 .withColumn('cumulativeSum', f.sum('number').over(window2))
 .withColumn('cumulativeSum', f.sum('number').over(window2))
 .show())

+---+-------+------+---+---+----+-------------+
| id|product|number|min|max| avg|cumulativeSum|
+---+-------+------+---+---+----+-------------+
|  1|   chip|     1|  1|  2| 1.5|            1|
|  3|   chip|     2|  1|  2| 1.5|            3|
|  2|  drink|     2|  1|  5|2.75|            2|
|  5|  drink|     3|  1|  5|2.75|            5|
|  7|  drink|     1|  1|  5|2.75|            6|
| 11|  drink|     5|  1|  5|2.75|           11|
|  4|   fish|     1|  1|  7| 4.0|            1|
|  8|   fish|     4|  1|  7| 4.0|            5|
| 12|   fish|     7|  1|  7| 4.0|           12|
|  6|  other|     5|  1|  6| 4.0|            5|
|  9|  other|     1|  1|  6| 4.0|            6|
| 10|  other|     6|  1|  6| 4.0|           12|
+---+-------+------+---+---+----+-------------+



In [None]:
window = Window.orderBy('id')

(df
 .withColumn('lead3', f.lead('number', 3, 0).over(window))
 .withColumn('lead2', f.lead('number', 2, 0).over(window))
 .withColumn('lead1', f.lead('number', 1, 0).over(window))
 .withColumn('center', f.col('number'))
 .withColumn('lag1', f.lag('number', 1, 0).over(window))
 .withColumn('lag2', f.lag('number', 2, 0).over(window))
 .withColumn('lag3', f.lag('number', 3, 0).over(window))
 .show())

+---+-------+------+-----+-----+-----+------+----+----+----+
| id|product|number|lead3|lead2|lead1|center|lag1|lag2|lag3|
+---+-------+------+-----+-----+-----+------+----+----+----+
|  1|   chip|     1|    1|    2|    2|     1|   0|   0|   0|
|  2|  drink|     2|    3|    1|    2|     2|   1|   0|   0|
|  3|   chip|     2|    5|    3|    1|     2|   2|   1|   0|
|  4|   fish|     1|    1|    5|    3|     1|   2|   2|   1|
|  5|  drink|     3|    4|    1|    5|     3|   1|   2|   2|
|  6|  other|     5|    1|    4|    1|     5|   3|   1|   2|
|  7|  drink|     1|    6|    1|    4|     1|   5|   3|   1|
|  8|   fish|     4|    5|    6|    1|     4|   1|   5|   3|
|  9|  other|     1|    7|    5|    6|     1|   4|   1|   5|
| 10|  other|     6|    0|    7|    5|     6|   1|   4|   1|
| 11|  drink|     5|    0|    0|    7|     5|   6|   1|   4|
| 12|   fish|     7|    0|    0|    0|     7|   5|   6|   1|
+---+-------+------+-----+-----+-----+------+----+----+----+



In [None]:
window = Window.orderBy('product')

(df
 .withColumn('row_number', f.row_number().over(window)) # row_number(): 순서대로 숫자 맵핑
 .withColumn('rank', f.rank().over(window))             # rank: row_number()에 맞게 증가
 .withColumn('dense_rank', f.dense_rank().over(window)) # dense_rank(): window에 따른 순서로 증가
 .show())

+---+-------+------+----------+----+----------+
| id|product|number|row_number|rank|dense_rank|
+---+-------+------+----------+----+----------+
|  1|   chip|     1|         1|   1|         1|
|  3|   chip|     2|         2|   1|         1|
|  2|  drink|     2|         3|   3|         2|
|  5|  drink|     3|         4|   3|         2|
|  7|  drink|     1|         5|   3|         2|
| 11|  drink|     5|         6|   3|         2|
|  4|   fish|     1|         7|   7|         3|
|  8|   fish|     4|         8|   7|         3|
| 12|   fish|     7|         9|   7|         3|
|  6|  other|     5|        10|  10|         4|
|  9|  other|     1|        11|  10|         4|
| 10|  other|     6|        12|  10|         4|
+---+-------+------+----------+----+----------+

