### Spark , mysql DB연결

In [6]:
# pyspark --jars C:\env\mysql-connector-java-8.0.32.jar
import findspark
findspark.init()

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()

df = spark.read.format('jdbc').options(         # JDBC 드라이버로 데이터베이스에서 데이터를 읽어옵니다.
        driver="com.mysql.jdbc.Driver",         # JDBC 드라이버 클래스 이름을 지정합니다.
        url="jdbc:mysql://localhost:3306/sky",  # 데이터베이스의 URL을 지정합니다.
        dbtable="customers",                    # 데이터베이스에서 읽어올 테이블의 이름을 지정합니다.
        user="root",                            # 데이터베이스에 접속할 때 사용할 사용자 이름을 지정합니다.
        password="Leehaneul12#"                 # 데이터베이스에 접속할 때 사용할 비밀번호를 지정합니다.
    ) \
    .load()  # 설정한 옵션들을 기반으로 데이터를 로드합니다.

df.show()


+---+----------+---------+--------------------+
| id|first_name|last_name|               email|
+---+----------+---------+--------------------+
|  1|      John|      Doe| johndoe@example.com|
|  2|      Jane|    Smith|janesmith@example...|
|  3|      Mark|  Johnson|markjohnson@examp...|
|  4|        Jo|       Do|    jodo@example.com|
|  5|        Ja|      Smi|   jasmi@example.com|
|  6|        Ma|      Joh|   majoh@example.com|
+---+----------+---------+--------------------+



### 새로운 데이터프레임을 생성합니다.


In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


new_customers_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(),True)
])
new_customers_data = [(7, "Alice", "Kim","alice@example.com"), (8, "Bob", "jon","bob@example.com")]
new_customers = spark.createDataFrame(new_customers_data, schema=new_customers_schema)

# 불러온 데이터프레임과 새로운 데이터프레임을 합칩니다.
merged_df = df.union(new_customers)

merged_df.show()

# 새로운 테이블을 생성합니다.
# merged_df 데이터프레임을 사용하여 merged_customers 임시 테이블을 생성합니다.
merged_df.createOrReplaceTempView("merged_customers")
merged_df.write.format('jdbc').options(
        url='jdbc:mysql://localhost:3306/sky',
        driver='com.mysql.jdbc.Driver',
        dbtable='ab_customers',
        user='root',
        password='Leehaneul12#'
    ).mode('overwrite').save()


+---+----------+---------+--------------------+
| id|first_name|last_name|               email|
+---+----------+---------+--------------------+
|  1|      John|      Doe| johndoe@example.com|
|  2|      Jane|    Smith|janesmith@example...|
|  3|      Mark|  Johnson|markjohnson@examp...|
|  4|        Jo|       Do|    jodo@example.com|
|  5|        Ja|      Smi|   jasmi@example.com|
|  6|        Ma|      Joh|   majoh@example.com|
|  7|     Alice|      Kim|   alice@example.com|
|  8|       Bob|      jon|     bob@example.com|
+---+----------+---------+--------------------+



### 행추가


In [7]:
# 새로운 데이터를 생성합니다. 행 추가! 
new_data = [(9, "sky", "lee", "sky@example.com")]

# 새로운 데이터를 포함하는 데이터프레임을 생성합니다.
new_df = spark.createDataFrame(new_data, ["id", "first_name", "last_name", "email"])

# 데이터베이스에 새로운 데이터를 추가합니다.
new_df.write.format('jdbc').options(
        driver='com.mysql.jdbc.Driver',
        url='jdbc:mysql://localhost:3306/sky',
        dbtable='merged_customers',
        user='root',
        password='Leehaneul12#'
    ).mode('append').save()

#확인해보기
df = spark.read.format('jdbc').options(         
        driver="com.mysql.jdbc.Driver",         
        url="jdbc:mysql://localhost:3306/sky",  
        dbtable="merged_customers",             
        user="root",                            
        password="Leehaneul12#"                     ) \
    .load()  # 설정한 옵션들을 기반으로 데이터를 로드합니다.

df.show()


+---+----------+---------+--------------------+
| id|first_name|last_name|               email|
+---+----------+---------+--------------------+
|  1|      John|      Doe| johndoe@example.com|
|  2|      Jane|    Smith|janesmith@example...|
|  3|      Mark|  Johnson|markjohnson@examp...|
|  4|        Jo|       Do|    jodo@example.com|
|  5|        Ja|      Smi|   jasmi@example.com|
|  6|        Ma|      Joh|   majoh@example.com|
|  7|     Alice|      Kim|   alice@example.com|
|  8|       Bob|      jon|     bob@example.com|
|  9|       sky|      lee|     sky@example.com|
|  9|       sky|      lee|     sky@example.com|
|  9|       sky|      lee|     sky@example.com|
+---+----------+---------+--------------------+



### 열추가

In [8]:

from pyspark.sql.functions import col, substring, when

df_new = df.withColumn("gender", 
                       when(substring(col("last_name"), 1, 1).between("A", "J"), "man")
                       .otherwise("woman"))

df_new.createOrReplaceTempView("merged_customers")
df_new.write.format('jdbc').options(
        url='jdbc:mysql://localhost:3306/sky',
        driver='com.mysql.jdbc.Driver',
        dbtable='ab_customers',
        user='root',
        password='Leehaneul12#'
    ).mode('overwrite').save()

#확인해보기
df = spark.read.format('jdbc').options(         
        driver="com.mysql.jdbc.Driver",         
        url="jdbc:mysql://localhost:3306/sky",  
        dbtable="ab_customers",             
        user="root",                            
        password="Leehaneul12#"                     ) \
    .load()  # 설정한 옵션들을 기반으로 데이터를 로드합니다.

df.show()

+---+----------+---------+--------------------+------+
| id|first_name|last_name|               email|gender|
+---+----------+---------+--------------------+------+
|  1|      John|      Doe| johndoe@example.com|   man|
|  2|      Jane|    Smith|janesmith@example...| woman|
|  3|      Mark|  Johnson|markjohnson@examp...|   man|
|  4|        Jo|       Do|    jodo@example.com|   man|
|  5|        Ja|      Smi|   jasmi@example.com| woman|
|  6|        Ma|      Joh|   majoh@example.com|   man|
|  7|     Alice|      Kim|   alice@example.com| woman|
|  8|       Bob|      jon|     bob@example.com| woman|
|  9|       sky|      lee|     sky@example.com| woman|
|  9|       sky|      lee|     sky@example.com| woman|
|  9|       sky|      lee|     sky@example.com| woman|
+---+----------+---------+--------------------+------+



##### Spark 내장함수와 SQL문. 둘 다 같은 결과를 도출

In [56]:
df_new.createTempView('df')
df_sql = spark.sql("""SELECT *,   
    CASE 
        WHEN substring(last_name, 1, 1) BETWEEN 'A' AND 'J' THEN 'man'
        ELSE 'woman'
    END AS gender
FROM df""")

#위는 sql문, 밑은 spark 내장 함수를 활용

df_new = df.withColumn("gender", 
                       when(substring(col("last_name"), 1, 1).between("A", "J"), "man")
                       .otherwise("woman"))

df_sql.show()

### Join 연습

In [54]:
# 첫번째 테이블 생성
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

data1 = [(1, "John"), (2, "Jane"), (3, "Mark"), (4, "Jo")]
schema1 = StructType([StructField("id", IntegerType(), True), StructField("name", StringType(), True)])
df1 = spark.createDataFrame(data1, schema1)

# 두번째 테이블 생성
data2 = [(1, "johndoe@example.com"), (2, "janesmith@example.com"), (3, "markjohnson@example.com"), (4, "jasmi@example.com")]
schema2 = StructType([StructField("id", IntegerType(), True), StructField("email", StringType(), True)])
df2 = spark.createDataFrame(data2, schema2)

# join 연습
df_join = df1.join(df2, ["id"])
df_join.show()


+---+----+--------------------+
| id|name|               email|
+---+----+--------------------+
|  1|John| johndoe@example.com|
|  2|Jane|janesmith@example...|
|  3|Mark|markjohnson@examp...|
|  4|  Jo|   jasmi@example.com|
+---+----+--------------------+



In [16]:

df_new.show()

+---+----------+---------+--------------------+------+
| id|first_name|last_name|               email|gender|
+---+----------+---------+--------------------+------+
|  1|      John|      Doe| johndoe@example.com|   man|
|  2|      Jane|    Smith|janesmith@example...| woman|
|  3|      Mark|  Johnson|markjohnson@examp...|   man|
|  4|        Jo|       Do|    jodo@example.com|   man|
|  5|        Ja|      Smi|   jasmi@example.com| woman|
|  6|        Ma|      Joh|   majoh@example.com|   man|
|  7|     Alice|      Kim|   alice@example.com| woman|
|  8|       Bob|      jon|     bob@example.com| woman|
|  9|       sky|      lee|     sky@example.com| woman|
|  9|       sky|      lee|     sky@example.com| woman|
|  9|       sky|      lee|     sky@example.com| woman|
+---+----------+---------+--------------------+------+



In [17]:
from pyspark.sql import SparkSession, Row

import random
# part 컬럼 추가

part = ["Service", "Clean", "Market", "economic"]

result = [Row(ID=i, department=part[random.randint(0, 3)]) for i in range(df_new.count())]

spark = SparkSession.builder.appName("Random Dataframe").getOrCreate()

df_part = spark.createDataFrame(result)

df_part.show()

# join 연습
df_join_part = df_new.join(df_part, ["id"]) 
df_join_part.show()

+---+----------+
| ID|department|
+---+----------+
|  0|    Market|
|  1|  economic|
|  2|  economic|
|  3|    Market|
|  4|     Clean|
|  5|  economic|
|  6|    Market|
|  7|   Service|
|  8|     Clean|
|  9|   Service|
| 10|     Clean|
+---+----------+



# TimeStamp 찍기

In [25]:
from pyspark.sql.functions import current_date,date_format
# # df_join_part = df_join_part.drop("Timestamp") #drop 예제


df_join_part = df_join_part.withColumn("Timestamp",date_format(current_date(), "yyyy-MM-dd"))
df_drop_duplicates = df_join_part.dropDuplicates()  #중복된 행 제거
df_drop_duplicates.show()

df_drop_duplicates.createOrReplaceTempView("distinct")
df_drop_duplicates.write.format('jdbc').options(
        url='jdbc:mysql://localhost:3306/sky',
        driver='com.mysql.jdbc.Driver',
        dbtable='part1',
        user='root',
        password='Leehaneul12#'
    ).mode('overwrite').save()
df_drop_duplicates.show()




+---+----------+---------+--------------------+------+----------+----------+
| id|first_name|last_name|               email|gender|department| Timestamp|
+---+----------+---------+--------------------+------+----------+----------+
|  5|        Ja|      Smi|   jasmi@example.com| woman|  economic|2023-04-20|
|  9|       sky|      lee|     sky@example.com| woman|   Service|2023-04-20|
|  2|      Jane|    Smith|janesmith@example...| woman|  economic|2023-04-20|
|  7|     Alice|      Kim|   alice@example.com| woman|   Service|2023-04-20|
|  3|      Mark|  Johnson|markjohnson@examp...|   man|    Market|2023-04-20|
|  1|      John|      Doe| johndoe@example.com|   man|  economic|2023-04-20|
|  6|        Ma|      Joh|   majoh@example.com|   man|    Market|2023-04-20|
|  4|        Jo|       Do|    jodo@example.com|   man|     Clean|2023-04-20|
|  8|       Bob|      jon|     bob@example.com| woman|     Clean|2023-04-20|
+---+----------+---------+--------------------+------+----------+----------+

In [30]:
from pyspark.sql.types import IntegerType,TimestampType,StringType
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import asc

aaa  = df_drop_duplicates.select(
    df_drop_duplicates["ID"].cast(IntegerType()),
    df_drop_duplicates["first_name"],
    df_drop_duplicates["last_name"],
    df_drop_duplicates["email"],
    df_drop_duplicates["gender"],
    df_drop_duplicates["department"].cast(StringType()),
    df_drop_duplicates["Timestamp"].cast(TimestampType())
)

ab =aaa.orderBy(asc("ID"))
ab.show()

+---+----------+---------+--------------------+------+----------+-------------------+
| ID|first_name|last_name|               email|gender|department|          Timestamp|
+---+----------+---------+--------------------+------+----------+-------------------+
|  1|      John|      Doe| johndoe@example.com|   man|  economic|2023-04-20 00:00:00|
|  2|      Jane|    Smith|janesmith@example...| woman|  economic|2023-04-20 00:00:00|
|  3|      Mark|  Johnson|markjohnson@examp...|   man|    Market|2023-04-20 00:00:00|
|  4|        Jo|       Do|    jodo@example.com|   man|     Clean|2023-04-20 00:00:00|
|  5|        Ja|      Smi|   jasmi@example.com| woman|  economic|2023-04-20 00:00:00|
|  6|        Ma|      Joh|   majoh@example.com|   man|    Market|2023-04-20 00:00:00|
|  7|     Alice|      Kim|   alice@example.com| woman|   Service|2023-04-20 00:00:00|
|  8|       Bob|      jon|     bob@example.com| woman|     Clean|2023-04-20 00:00:00|
|  9|       sky|      lee|     sky@example.com| woman|