In [None]:
!pip install pyspark==3.3.1 py4j==0.10.9.5

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark DataFrame #2')\
        .getOrCreate()

In [None]:
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/customer-orders.csv

--2023-01-15 20:45:38--  https://s3-geospatial.s3-us-west-2.amazonaws.com/customer-orders.csv
Resolving s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)... 52.92.241.154, 52.218.229.249, 52.92.197.42, ...
Connecting to s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)|52.92.241.154|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 146855 (143K) [text/csv]
Saving to: ‘customer-orders.csv.1’


2023-01-15 20:45:38 (581 KB/s) - ‘customer-orders.csv.1’ saved [146855/146855]



In [None]:
!ls -tl

total 296
drwxr-xr-x 2 root root   4096 Jan 15 20:43 spark-warehouse
drwxr-xr-x 1 root root   4096 Jan  9 14:36 sample_data
-rw-r--r-- 1 root root 146855 Apr 10  2022 customer-orders.csv
-rw-r--r-- 1 root root 146855 Apr 10  2022 customer-orders.csv.1


In [None]:
!head -5 customer-orders.csv

44,8602,37.19
35,5368,65.89
2,3391,40.64
47,6694,14.98
29,680,13.08


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType

schema = StructType([ \
                     StructField("cust_id", StringType(), True), \
                     StructField("item_id", StringType(), True), \
                     StructField("amount_spent", FloatType(), True)])

In [None]:
df = spark.read.schema(schema).csv("customer-orders.csv")
df.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- amount_spent: float (nullable = true)



In [None]:
df_ca = df.groupBy("cust_id").sum("amount_spent")

In [None]:
df_ca.show()

+-------+------------------+
|cust_id| sum(amount_spent)|
+-------+------------------+
|     51| 4975.219970226288|
|      7| 4755.070008277893|
|     15| 5413.510010659695|
|     54| 6065.390002984554|
|     11| 5152.289969373494|
|     29|5032.5300433933735|
|     69| 5123.010002791882|
|     42| 5696.840004444122|
|     73| 6206.199985742569|
|     87| 5206.400022745132|
|     64| 5288.690012812614|
|      3| 4659.629958629608|
|     30| 4990.720004022121|
|     34|5330.8000039458275|
|     59| 5642.890004396439|
|      8|5517.2399980425835|
|     22| 5019.449993014336|
|     28|  5000.71000123024|
|     85|  5503.42998456955|
|     35|  5155.41999566555|
+-------+------------------+
only showing top 20 rows



In [None]:
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")

In [None]:
df_ca.show(10)

+-------+------------------+
|cust_id|               sum|
+-------+------------------+
|     51| 4975.219970226288|
|      7| 4755.070008277893|
|     15| 5413.510010659695|
|     54| 6065.390002984554|
|     11| 5152.289969373494|
|     29|5032.5300433933735|
|     69| 5123.010002791882|
|     42| 5696.840004444122|
|     73| 6206.199985742569|
|     87| 5206.400022745132|
+-------+------------------+
only showing top 10 rows



In [None]:
import pyspark.sql.functions as f

df_ca = df.groupBy("cust_id") \
   .agg(f.sum('amount_spent').alias('sum'))

In [None]:
df_ca.show(5)

+-------+-----------------+
|cust_id|              sum|
+-------+-----------------+
|     51|4975.219970226288|
|      7|4755.070008277893|
|     15|5413.510010659695|
|     54|6065.390002984554|
|     11|5152.289969373494|
+-------+-----------------+
only showing top 5 rows



MAX와 AVG 값도 구하고 싶다면?

In [None]:
df.groupBy("cust_id") \
   .agg(
       f.sum('amount_spent').alias('sum'),
       f.max('amount_spent').alias('max'),
       f.avg('amount_spent').alias('avg')).collect()

[Row(cust_id='51', sum=4975.219970226288, max=97.61000061035156, avg=48.77666637476753),
 Row(cust_id='7', sum=4755.070008277893, max=98.5999984741211, avg=50.58585115189248),
 Row(cust_id='15', sum=5413.510010659695, max=99.56999969482422, avg=52.05298087172783),
 Row(cust_id='54', sum=6065.390002984554, max=99.2300033569336, avg=49.31211384540288),
 Row(cust_id='11', sum=5152.289969373494, max=99.11000061035156, avg=47.70638860531013),
 Row(cust_id='29', sum=5032.5300433933735, max=99.87000274658203, avg=45.75027312175794),
 Row(cust_id='69', sum=5123.010002791882, max=98.91999816894531, avg=51.230100027918816),
 Row(cust_id='42', sum=5696.840004444122, max=99.05999755859375, avg=56.968400044441225),
 Row(cust_id='73', sum=6206.199985742569, max=99.98999786376953, avg=52.594915133411604),
 Row(cust_id='87', sum=5206.400022745132, max=99.97000122070312, avg=54.2333335702618),
 Row(cust_id='64', sum=5288.690012812614, max=99.55999755859375, avg=49.427009465538454),
 Row(cust_id='3', su

## Spark SQL로 처리해보기

In [None]:
df.createOrReplaceTempView("customer_orders")

In [None]:
spark.sql("""SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
FROM customer_orders
GROUP BY 1""").head(5)

[Row(cust_id='51', sum=4975.219970226288, max=97.61000061035156, avg=48.77666637476753),
 Row(cust_id='7', sum=4755.070008277893, max=98.5999984741211, avg=50.58585115189248),
 Row(cust_id='15', sum=5413.510010659695, max=99.56999969482422, avg=52.05298087172783),
 Row(cust_id='54', sum=6065.390002984554, max=99.2300033569336, avg=49.31211384540288),
 Row(cust_id='11', sum=5152.289969373494, max=99.11000061035156, avg=47.70638860531013)]

Spark은 기본으로 in-memory 카탈로그를 사용. 스토리지 기반의 카탈로그를 쓰고 싶다면 SparkSession 설정할 때 enableHiveSupport()를 호출 (Hive metastore를 카탈로그로 사용하며 Hive UDF와 Hive 파일포맷 사용 가능)

In [None]:
spark.catalog.listTables()

[Table(name='customer_orders', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]