# Spark Local & Spark Client mode 
- pyspark를 이용해 local mode로 스파크를 사용하는 샘플 코드  
- pyspark를 이용해 cluster mode 에 client오 연결해 스파크를 사용하는 샘플 코드 
Spark, Hadoop 를 별도로 설치하지 않고, pyspark 만을 설치한 상태에서 실행할 수 있다.  
단, Spark 자체가 jvm에서 작동하기 때문에 JDK 는 설치되어 있어야 한다.  
  
주피터에서 spark 실행 시, jvm 연동이 정상적으로 되게 하기 위해서는 jupyter 실행 전에 
미리 `JAVA_HOME`, `PATH`를 설정해 두는 것이 좋다.  
  
pyspark를 설치한 경우 `SPARK_HOME`은 `pyspark`를 실행할 수 있는 `bin/pyspark`(which pyspark로 확인)가 위치한 경로를 말한다.  

In [1]:
!which pyspark

/usr/local/bin/pyspark


In [2]:
# import libraries 
import socket
import sys
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
import time 

In [13]:
# 전역변수 설정 
# local mode 
SPARK_LOCAL_MASTER = "local[3]"
# client mode 
SPARK_CLUSTER_MASTER = "spark://34.64.108.172:7077" 
SPARK_APP_NAME = "DATA-Preparation"
HOST_NAME = socket.gethostname()
PORT = 9999

# Define path
DATA_ROOT = f'{os.getcwd()}/data'
DATA_PATH = f'{DATA_ROOT}'

In [10]:
!pwd

/tf/notebooks/notebooks


In [11]:
# os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64/'
# os.environ['PATH'] = '/usr/local/sbin:/usr/local/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin/'
os.environ['SPARK_HOME'] = '/usr/local'
os.environ['HADOOP_HOME'] = '/hadoop-3.2.2'
os.environ['hadoop.home.dir'] = '/hadoop-3.2.2/bin'
os.environ['CLASSPATH'] = '$CLASSPATH:/hadoop-3.2.2/spark-3.2.0-bin-hadoop3.2.tar'
print(os.getenv('HADOOP_HOME'))

/hadoop-3.2.2


In [12]:
print(f"JAVA_HOME : {os.getenv('JAVA_HOME')}")
print(f"PATH : {os.getenv('PATH')}")
os.getcwd()

JAVA_HOME : /usr/lib/jvm/java-8-openjdk-amd64/
PATH : /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin/


'/tf/notebooks/notebooks'

In [14]:
# Spark session 생성 메서드 
def init_remote_session():
    #SPARK_CLUSTER_MASTER = "spark://34.64.108.172:7077" 
    spark = SparkSession.builder.master(SPARK_CLUSTER_MASTER).appName(SPARK_APP_NAME).getOrCreate()
    print(sc._conf.getAll())
    return spark
    
# local mode 실행 시 메모리를 확장해야 하는 경우 있어서, conf에 memory 변경 추가  
def init_local_session():
    #SPARK_LOCAL_MASTER = "local[3]"
    spark = SparkSession.builder.master(SPARK_LOCAL_MASTER).appName(SPARK_APP_NAME).config('spark.driver.host', HOST_NAME).getOrCreate()
    default_conf = spark.sparkContext._conf.getAll()
    conf = spark.sparkContext._conf.setAll([
        ('spark.executor.instances', 1)
        #, ('spark.driver.memory', '12g'), ('spark.executor.memory', '8g'), ('spark.driver.maxResultSize', '8g')
        , ('spark.driver.allowMultipleContexts', 'true'), ('spark.sql.shuffle.partitions', 8)
        ##,('spark.memory.offHeap.enabled', True), ('spark.memory.offHeap.size', '8g')
    ])
    spark.sparkContext.stop()
    
    spark = SparkSession.builder.master(SPARK_LOCAL_MASTER).appName(SPARK_APP_NAME).config(conf=default_conf).getOrCreate()
    new_conf = spark.SparkContext._conf.getAll()
    print(f'Updated Conf : {new_conf}')
    return spark
    

In [15]:
# 스파크 생성 
spark = init_local_session()
sc = spark.sparkContext
sc

RuntimeError: Java gateway process exited before sending its port number

In [13]:
# channel, day, seg, setop, 1000

# setop data n 개 만들기
def create_setops():
    setop_count = 10000
    setop_name = ['ST_A', 'ST_B', 'ST_C', 'ST_D', 'ST_E']
    setops = []
    for s in setop_name:
        for i in range(0, int(setop_count/len(setop_name))):
            setops.append(f'{s}_{i:03d}')
            
    print(setops[-10:])
    return setops

# channel, day, seg data 생성 
def create_others():
    # 20
    channels = ['KBS', 'MBC', 'SBS', 'JTBC', 'CBS' ,  'OCN', 'TVN', 'TVCH', 'BTN', 'EBS',  'Arirang', 'JTV', 'GAME-TV', 'HBC', 'BBC',  'CNN', 'CNBC', 'CCN', 'NHK', 'ABC'] 
    days = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
    hour_bands = ['00', '01', '10', '11', '23']
    segs = ['Agriculture', 'Game']
    rows = []
    row = []
    for c in channels:
        for d in days:
            for s in segs:
                row = [c, d, s]
                rows.append(row)
                
    print(rows[:5])
    return rows

def merge_to_inventory(setops, rows):
    inven_time = 10000 # 하드코딩 시간
    invens = []
    for r in rows:
        for s in setops:
            invens.append(r + [s, inven_time])
    print(f'Inventory Length : {len(invens):,}')
    return invens

def define_schema():
    from pyspark.sql.types import StructType, StructField, StringType, LongType
    columns = [
        StructField("channel", StringType())
        , StructField("day", StringType())
        , StructField("seg", StringType())
        , StructField("setop", StringType())
        , StructField("remains", LongType())
    ]
    inven_schema = StructType(columns)
    return inven_schema

def save_inventory(invens, spark_session=spark, file_name=f'{DATA_PATH}/inven', sample_count=10000):
    inven_schema = define_schema()
    if (sample_count <= 0):
        # all data 
        rdd = spark_session.sparkContext.parallelize(invens)
    else:
        # sampling data
        rdd = spark_session.sparkContext.parallelize(invens[:sample_count])
    df = spark_session.createDataFrame(rdd, inven_schema)
    df.write.save(path=file_name, format='csv', mode='append', sep=',')
        

In [14]:
%%time
setops = create_setops()

['ST_E_1990', 'ST_E_1991', 'ST_E_1992', 'ST_E_1993', 'ST_E_1994', 'ST_E_1995', 'ST_E_1996', 'ST_E_1997', 'ST_E_1998', 'ST_E_1999']
CPU times: user 8.11 ms, sys: 0 ns, total: 8.11 ms
Wall time: 8.41 ms


In [15]:
%%time
rows = create_others()

[['KBS', 'mon', 'Agriculture'], ['KBS', 'mon', 'Game'], ['KBS', 'tue', 'Agriculture'], ['KBS', 'tue', 'Game'], ['KBS', 'wed', 'Agriculture']]
CPU times: user 1.63 ms, sys: 0 ns, total: 1.63 ms
Wall time: 1.74 ms


In [16]:
%%time
invens = merge_to_inventory(setops, rows)

Inventory Length : 2,800,000
CPU times: user 1.97 s, sys: 257 ms, total: 2.22 s
Wall time: 2.22 s


In [None]:
# invens data 파일로 저장하기 
SAMPLING_COUNT = int(1e7)
TABLE_NAME = f'{DATA_PATH}/inven'

print(f'SAMPLING_COUNT : {SAMPLING_COUNT:,}')
print(f'TABLE_NAME : {TABLE_NAME}')
print(f'Inventory Length : {len(invens):,}')
      
save_inventory(invens, spark, TABLE_NAME, SAMPLING_COUNT)

SAMPLING_COUNT : 10,000,000
TABLE_NAME : /tf/notebooks/data/inven
Inventory Length : 2,800,000


In [None]:
# 저장 결과 확인하기 
lines = spark.read.format('csv').schema(define_schema()).option('path', TABLE_NAME).load()
data_count = lines.count()
print(f'DATA Count : {data_count:,}')
lines.show(5)

In [None]:
# ch 별로 setop 번호 순서 5개씩 보여주기
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number 

windowPart = Window.partitionBy('channel').orderBy(col('setop').desc())
lines = spark.read.format('csv').schema(define_schema()).option('path', TABLE_NAME).load()
df2 = lines.withColumn('row', row_number().over(windowPart)).filter('row') <= 5)
df2.show(50)