# Spark SQL 을 이용하여 mysql 데이터베이스에 데이터 저장하기

### 필요한 라이브러리 임포트

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
from pyspark.sql.functions import col, lit

### SparkContext, SparkSession 생성
### mysql 에 접속하기 위해서 connector 관련 설정을 해줘야 한다.

스파크가 JVM 위에서 실행되기 때문에   
DB 에 접속할 때 자바의 표준 데이터베이스 접속 기술인 JDBC 를 사용하고   
자바를 통해서 mysql 에 접속해야 하기 때문에 JDBC 용 mysql connector 를 추가해 줘야 한다.  

In [2]:
conf = SparkConf() \
    .setMaster('local') \
    .setAppName('sql') \
    .set("spark.driver.extraClassPath","jdbc/mysql-connector-java-5.1.44.jar")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

### DB 접속정보 설정

In [17]:
# DB 접속정보 설정
DB_URL = ''
DB_USER = ''
DB_PASS = ''

### 데이터가 저장되어있는 csv 파일을 DataFrame 으로 읽어들인다.

In [4]:
# 옵션 설명
# inferSchema : 데이터를 기반으로 타입을 유추(기본값 : False)
# header : 첫번째 행을 컬럼 값으로 사용

df = spark.read.csv("data/data.csv", inferSchema = True, header = True)
df.show(2)

+---+---+---+----+---+---+----+-----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---+---+-----+-----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+---+----+----+----+----+---+------+------+------+------+------+---+---+------+------+------+------+------+------+------+------+---+---+----+---+-----+-----+-----+-----+---+-----+-----+-----+-----+-----+---+---------+---------+-------+-------+-------+-------+-------+-----+-----+-----+-----+-----+-----+-------+-------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-

### 컬럼 정보 확인

In [5]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[ID: int, DM1: int, DM2: int, DM3: int, DM4: int, KK1: int, KK2: int, WT: double, Q1A_1: int, Q1A_2: int, Q1A_3: int, Q1A_4: int, Q1A_5: int, Q1A_6: int, Q1A_7: int, Q1A_8: int, Q1A_9: int, Q1A_10: int, Q1A_11: int, Q1A_12: int, Q1A_13: int, Q1A_14: int, Q1A_15: int, Q1A_16: int, Q1A_17: int, Q1A_18: int, Q1A_19: int, Q1A_20: int, Q1B_1: int, Q1B_2: int, Q1B_3: int, Q1B_4: int, Q1B_5: int, Q1B_6: int, Q1B_7: int, Q1B_8: int, Q1B_9: int, Q1B_10: int, Q1B_11: int, Q1B_12: int, Q1B_13: int, Q1B_14: int, Q1B_15: int, Q1B_16: int, Q1B_17: int, Q1B_18: int, Q1B_19: int, Q1B_20: int, Q1_1A1: int, Q1_1A2: int, Q1_1A3: int, Q1_1B1: int, Q1_1B2: int, Q1_1B3: int, Q2A: int, Q2B: int, Q2_1A: int, Q2_1B: int, Q2_2: int, Q3A_1: int, Q3A_2: int, Q3A_3: int, Q3A_4: int, Q3A_5: int, Q3A_6: int, Q3A_7: int, Q3A_8: int, Q3A_9: int, Q3B_1: int, Q3B_2: int, Q3B_3: int, Q3B_4: int, Q3B_5: int, Q3B_6: int, Q3B_7: int, Q3B_8: int, Q3B_9: int, Q3B_10: int, Q3C_1:

In [6]:
# DataFrame 타입인 것을 확인
print(type(df))

# 몇몇 컬럼만 선택한 결과는 새로운 DataFrame 으로 반완됨
print(type(df.select('ID', 'DM1', 'DM2')))

# 데이터 조회
df.select('ID', 'DM1', 'DM2').show(5)

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
+---+---+---+
| ID|DM1|DM2|
+---+---+---+
|  1|  3|  1|
|  2|  3|  2|
|  3|  2|  1|
|  4|  2|  2|
|  5|  3|  1|
+---+---+---+
only showing top 5 rows



### 응답자 정보 분리[ID(id), DM1(대상구분), DM2(성별)]

In [7]:
df_respondent = df.select('ID', 'DM1', 'DM2')
df_respondent.show(5)

+---+---+---+
| ID|DM1|DM2|
+---+---+---+
|  1|  3|  1|
|  2|  3|  2|
|  3|  2|  1|
|  4|  2|  2|
|  5|  3|  1|
+---+---+---+
only showing top 5 rows



In [None]:
### DB 에 저장
df_respondent \
    .write \
    .format("jdbc") \
    .mode('append') \
    .option("url", DB_URL) \
    .option("dbtable", "respondent") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .save()

### 최진영님 정보 분리[ID(id), Q1A_1 ~ 20, Q4]

In [18]:
columns = ['ID'] + ['Q1A_' + str(i) for i in range(1, 21)] + ['Q4']
df_cjy = df.select(columns)
df_cjy.show(5)

+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+---+
| ID|Q1A_1|Q1A_2|Q1A_3|Q1A_4|Q1A_5|Q1A_6|Q1A_7|Q1A_8|Q1A_9|Q1A_10|Q1A_11|Q1A_12|Q1A_13|Q1A_14|Q1A_15|Q1A_16|Q1A_17|Q1A_18|Q1A_19|Q1A_20| Q4|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+---+
|  1|    1|    1|    1|    1|    1|    0|    0|    1|    0|     1|     0|     0|     1|     1|     1|     0|     1|     0|     1|     1|  3|
|  2|    1|    1|    1|    1|    1|    0|    0|    1|    1|     0|     0|     0|     1|     1|     1|     0|     1|     1|     1|     1|  3|
|  3|    1|    1|    1|    1|    1|    1|    0|    1|    0|     0|     0|     0|     1|     1|     1|     0|     0|     0|     0|     1|  3|
|  4|    1|    1|    1|    1|    1|    0|    0|    1|    0|     0|     0|     0|     1|     1|     1|     1|     1|     1|     1|     1|  3|
|  5|    1|  

In [20]:
### DB 에 저장
df_cjy \
    .write \
    .format("jdbc") \
    .mode('append') \
    .option("url", DB_URL) \
    .option("dbtable", "response_cjy") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .save()

### 김아영님 정보 분리[ID(id), Q9A, Q10]

In [21]:
df_kay = df.select('ID', 'Q9A', 'Q10')
df_kay.show(5)

+---+---+---+
| ID|Q9A|Q10|
+---+---+---+
|  1|  1|  1|
|  2|  1|  1|
|  3|  1|  1|
|  4|  1|  1|
|  5|  1|  1|
+---+---+---+
only showing top 5 rows



In [23]:
### DB 에 저장
df_kay \
    .write \
    .format("jdbc") \
    .mode('append') \
    .option("url", DB_URL) \
    .option("dbtable", "response_kay") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .save()

### 김유민님 정보 분리[ID(id), Q8, Q8_1_1~6]

In [24]:
columns = ['ID', 'Q8'] + ['Q8_1_' + str(i) for i in range(1, 7)]
df_kym = df.select(columns)
df_kym.show(5)

+---+---+------+------+------+------+------+------+
| ID| Q8|Q8_1_1|Q8_1_2|Q8_1_3|Q8_1_4|Q8_1_5|Q8_1_6|
+---+---+------+------+------+------+------+------+
|  1|  1|     1|     2|  null|  null|  null|  null|
|  2|  1|     1|     2|  null|  null|  null|  null|
|  3|  1|     3|     4|     5|  null|  null|  null|
|  4|  1|     2|     3|     4|  null|  null|  null|
|  5|  1|     1|     2|  null|  null|  null|  null|
+---+---+------+------+------+------+------+------+
only showing top 5 rows



In [25]:
### DB 에 저장
df_kym \
    .write \
    .format("jdbc") \
    .mode('append') \
    .option("url", DB_URL) \
    .option("dbtable", "response_kym") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .save()

### 윤재철님 정보 분리[ID(id), Q12, Q12_1]

In [26]:
df_yjc = df.select('ID', 'Q12', 'Q12_1')
df_yjc.show(5)

+---+---+-----+
| ID|Q12|Q12_1|
+---+---+-----+
|  1|  3|    1|
|  2|  3|    2|
|  3|  3|    1|
|  4|  3|    1|
|  5|  3|    2|
+---+---+-----+
only showing top 5 rows



In [27]:
### DB 에 저장
df_yjc \
    .write \
    .format("jdbc") \
    .mode('append') \
    .option("url", DB_URL) \
    .option("dbtable", "response_yjc") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .save()

### 과의존 여부 판단을 위한 학습 데이터 분리

- DM1 선택에 따라 타겟 컬럼이 달라진다

In [None]:
# 유아동은 컬럼이 9번까지 있어서 별도 처리를 해야하기 때문에
# 10번 컬럼을 임의로 만들어서 None 값으로 넣어준다

### int type에 컬럼 Q3A_10 을 추가하고 데이터는 None 으로 채운다.
df_fit = df.withColumn('Q3A_10', lit(None).cast('int'))

def insertData(dm1, grp):
    columns = ['ID']
    
    for i in range(1, 11):
        
        # col('Q3{}_{}'.format(grp, i)) => 컬럼을 선택
        # .alias('Q3_{}'.format(i)) => 이름을 변경
        
        # dm1 : 1, grp : A 일 때
        # Q3A_1 ~ Q3A_10 => Q3_1 ~ Q3_10

        column = col('Q3{}_{}'.format(grp, i)).alias('Q3_{}'.format(i))
        columns = columns + [column]
    
    columns = columns + ['KK1']
    
    # df_fit.select(columns) : 선언한 컬럼들만 추출
    # .filter('DM1 == {}'.format(dm1)) : DM1 값이 dm1 인 row 만 필터
        # pandas dataframe 의 df[df['DM1'] == 1] 과 같다
    df_questions = df_fit.select(columns).filter('DM1 == {}'.format(dm1))
    
    ### DB 에 저장
    ### mode : append => 테이블에 데이터만 추가하는 옵션
    df_questions \
        .write \
        .format("jdbc") \
        .mode('append') \
        .option("url", DB_URL) \
        .option("dbtable", "response_fit") \
        .option("user", DB_USER) \
        .option("password", DB_PASS) \
        .save()
    
    
insertData(1, 'A')
insertData(2, 'B')
insertData(3, 'C')
insertData(4, 'D')