In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('demo') \
    .config("spark.driver.memory", "2g") \
    .config('spark.executor.memory', '2g') \
    .getOrCreate()
    
sc = spark.sparkContext

In [2]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.executor.memory', '2g'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.driver.host', 'DESKTOP-2K42TPL')

In [13]:
data_rdd = sc.textFile('./test.csv')
data_rdd.take(5)

['"기준_날짜","집계_기준","기준_시간대","시작_대여소_ID","시작_대여소명","종료_대여소_ID","종료_대여소명","전체_건수","전체_이용_분","전체_이용_거리"',
 '"20240421","출발시간","0000","ST-1041","성내1동_033_1","ST-495","성내3동_014_1","1","9","1752"',
 '"20240421","출발시간","0000","ST-1046","성내2동_015_1","ST-3209","천호2동_048_1","1","8","1000"',
 '"20240421","출발시간","0000","ST-105","성수2가3동_015_1","ST-987","자양3동_004_1","1","13","1880"',
 '"20240421","출발시간","0000","ST-1061","천호1동_039_1","ST-1626","천호3동_040_1","1","6","1344"']

In [21]:
from collections.abc import Iterable
from collections import defaultdict
from typing import List

def fiilterHead(seq:str):
    if seq.startswith('"기준'):
        return False
    return True


def extractRdd(iterator:Iterable[str]):
    hash_map = defaultdict(int)
    
    for seq in iterator:
        seq = seq.split(',')
        start, end, num = seq[3][1:-1], seq[5][1:-1], int(seq[7][1:-1])
        
        hash_map[start+','+end] += num
    
    result = [(k, v) for k, v in hash_map.items()]
    return result

def extractMatrixA(iterator:Iterable[tuple[str, int]]):
    
    hash_map = defaultdict(List)
    
    for tup in iterator:
        start, end = tup[0].split(',')
        num = tup[1]
        
        if start not in hash_map:
            hash_map[start] = []
            
        hash_map[start].append((end, num))
    
    result = [(k, v) for k, v in hash_map.items()]
    return result

    

rdd = data_rdd.filter(fiilterHead).mapPartitions(extractRdd).mapPartitions(extractMatrixA)
## RDD is like below
## (start stop, [(end stop, number of bikes used that route), ....])
rdd.take(25)


[('ST-1041', [('ST-495', 2), ('ST-1092', 2)]),
 ('ST-1046',
  [('ST-3209', 2),
   ('ST-1874', 2),
   ('ST-541', 2),
   ('ST-1046', 1),
   ('ST-1684', 1),
   ('ST-1041', 2)]),
 ('ST-105',
  [('ST-987', 2),
   ('ST-361', 4),
   ('ST-259', 1),
   ('ST-1616', 1),
   ('ST-2320', 1)]),
 ('ST-1061',
  [('ST-1626', 2),
   ('ST-1044', 2),
   ('ST-1061', 4),
   ('ST-1420', 4),
   ('ST-495', 4),
   ('ST-2749', 4),
   ('ST-1842', 2),
   ('ST-493', 1),
   ('ST-1621', 1),
   ('ST-1837', 2),
   ('ST-1834', 1)]),
 ('ST-1069', [('ST-2551', 2)]),
 ('ST-1083', [('ST-1684', 4), ('ST-2740', 2), ('ST-1049', 2)]),
 ('ST-1098', [('ST-1671', 2), ('ST-1280', 1), ('ST-2846', 1)]),
 ('ST-1147', [('ST-625', 2)]),
 ('ST-1184', [('ST-2847', 2)]),
 ('ST-1215', [('ST-1347', 2)]),
 ('ST-1251',
  [('ST-1065', 2),
   ('ST-516', 4),
   ('ST-3084', 2),
   ('ST-525', 1),
   ('ST-2478', 1),
   ('ST-2482', 2),
   ('ST-1645', 1)]),
 ('ST-1252', [('ST-1514', 2), ('ST-2052', 2), ('ST-1062', 1)]),
 ('ST-1259', [('ST-705', 2), ('S