In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.master('local').appName('sql-count-trips').getOrCreate()

In [5]:
directory = "/home/dojm.ex5/spark-flink-kafka"
filename = "fhvhv_tripdata_2020-03.csv"

In [7]:
data = spark.read.csv(f"file:///{directory}/{filename}", inferSchema=True, header=True)

                                                                                

In [9]:
data.createOrReplaceTempView("mobility_data")

In [18]:
data.show(10)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   null|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   null|
|           HV0003|              B02682|2020-03-01 00:17:23|2020-03-01 00:39:35|

In [13]:
spark.sql("""
SELECT pickup_datetime
FROM mobility_data
LIMIT 5;
""").show()

+-------------------+
|    pickup_datetime|
+-------------------+
|2020-03-01 00:03:40|
|2020-03-01 00:28:05|
|2020-03-01 00:03:07|
|2020-03-01 00:18:42|
|2020-03-01 00:44:24|
+-------------------+



In [17]:
spark.sql("""
SELECT split(pickup_datetime, ' ')[0] as pickup_date
FROM mobility_data
""").show(5)

+-----------+
|pickup_date|
+-----------+
| 2020-03-01|
| 2020-03-01|
| 2020-03-01|
| 2020-03-01|
| 2020-03-01|
+-----------+
only showing top 5 rows



In [22]:
spark.sql("""
SELECT pickup_date, count(*)
FROM (
    SELECT split(pickup_datetime, ' ')[0] as pickup_date
    FROM mobility_data
)
GROUP BY pickup_date
""").show(5)



+-----------+--------+
|pickup_date|count(1)|
+-----------+--------+
| 2020-03-03|  697880|
| 2020-03-02|  648986|
| 2020-03-01|  784246|
| 2020-03-06|  872012|
| 2020-03-05|  731165|
+-----------+--------+
only showing top 5 rows



                                                                                

In [23]:
data.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)



In [24]:
data.select("pickup_datetime")

DataFrame[pickup_datetime: string]

In [25]:
spark = SparkSession.builder.appName("udf").getOrCreate()

In [28]:
transactions = [
    ('찹쌀탕수육+짜장2', '2021-11-07 13:20:00', 22000, 'KRW'),
    ('등심탕수육+크립새우+짜장면', '2021-10-24 11:19:00', 21500, 'KRW'), 
    ('월남 쌈 2인 세트', '2021-07-25 11:12:40', 42000, 'KRW'), 
    ('콩국수+열무비빔국수', '2021-07-10 08:20:00', 21250, 'KRW'), 
    ('장어소금+고추장구이', '2021-07-01 05:36:00', 68700, 'KRW'), 
    ('족발', '2020-08-19 19:04:00', 32000, 'KRW'),  
]

schema = ["name", "datetime", "price", "currency"]
df = spark.createDataFrame(data=transactions, schema=schema)

In [31]:
df.createOrReplaceTempView("transaction")

In [34]:
spark.sql("""
SELECT * 
FROM transaction
""").show()

+--------------------------+-------------------+-----+--------+
|                      name|           datetime|price|currency|
+--------------------------+-------------------+-----+--------+
|          찹쌀탕수육+짜장2|2021-11-07 13:20:00|22000|     KRW|
|등심탕수육+크립새우+짜장면|2021-10-24 11:19:00|21500|     KRW|
|          월남 쌈 2인 세트|2021-07-25 11:12:40|42000|     KRW|
|       콩국수+열무비빔국수|2021-07-10 08:20:00|21250|     KRW|
|       장어소금+고추장구이|2021-07-01 05:36:00|68700|     KRW|
|                      족발|2020-08-19 19:04:00|32000|     KRW|
+--------------------------+-------------------+-----+--------+



In [37]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

In [38]:
def squared1(s):
    return s * s

spark.udf.register("squared1", squared1, LongType())

<function __main__.squared1(s)>

In [40]:
@udf("long")
def squared2(s):
    return s * s

spark.udf.register("squared2", squared2)

<function __main__.squared2(s)>

In [45]:
spark.sql("""
SELECT name, squared1(price), squared2(price)
FROM transaction
""").show()

[Stage 18:>                                                         (0 + 1) / 1]                                                                                

+--------------------------+---------------+---------------+
|                      name|squared1(price)|squared2(price)|
+--------------------------+---------------+---------------+
|          찹쌀탕수육+짜장2|      484000000|      484000000|
|등심탕수육+크립새우+짜장면|      462250000|      462250000|
|          월남 쌈 2인 세트|     1764000000|     1764000000|
|       콩국수+열무비빔국수|      451562500|      451562500|
|       장어소금+고추장구이|     4719690000|     4719690000|
|                      족발|     1024000000|     1024000000|
+--------------------------+---------------+---------------+



In [46]:
def read_number(n):
    units = ["", "십", "백", "천", "만"]
    nums = "일이삼사오육칠팔구"
    
    result = []
    i = 0
    while n > 0:
        n, r = divmod(n, 10)
        if r > 0:
            result.append(nums[r - 1] + units[i])
        i += 1
    
    return "".join(result[::-1])

from pyspark.sql.types import StringType

spark.udf.register("read_number", read_number, StringType())
print(read_number(21250))

이만일천이백오십


In [50]:
spark.sql("""
SELECT name, read_number(price)
FROM transaction
""").show()

+--------------------------+------------------+
|                      name|read_number(price)|
+--------------------------+------------------+
|          찹쌀탕수육+짜장2|          이만이천|
|등심탕수육+크립새우+짜장면|      이만일천오백|
|          월남 쌈 2인 세트|          사만이천|
|       콩국수+열무비빔국수|  이만일천이백오십|
|       장어소금+고추장구이|      육만팔천칠백|
|                      족발|          삼만이천|
+--------------------------+------------------+



In [54]:
def get_weekday(date):
    import calendar
    return calendar.day_name[date.weekday()]

spark.udf.register("get_weekday", get_weekday)

22/01/25 13:20:28 WARN SimpleFunctionRegistry: The function get_weekday replaced a previously registered function.


<function __main__.get_weekday(date)>

In [55]:
spark.sql("""
SELECT datetime, get_weekday(TO_DATE(datetime)) AS day_of_week
FROM transaction
""").show()

+-------------------+-----------+
|           datetime|day_of_week|
+-------------------+-----------+
|2021-11-07 13:20:00|     Sunday|
|2021-10-24 11:19:00|     Sunday|
|2021-07-25 11:12:40|     Sunday|
|2021-07-10 08:20:00|   Saturday|
|2021-07-01 05:36:00|   Thursday|
|2020-08-19 19:04:00|  Wednesday|
+-------------------+-----------+

