In [7]:
import os
os.environ["SPARK_HOME"] = "/Users/jihyun/apache-spark/spark-3.0.1-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [8]:
# spark sql 사용 준비
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [77]:
path = "/Users/jihyun/Desktop/lecture/bigdata_pipeline_master/spark_practice/spark_sample/"
file_name = path + "user_info.csv"

def processToRDD(line):
    return line.split(",") # 리스트로 반환

def processToDF(line):
    return Row(id = int(line[0]), name = line[1], age = int(line[2]), num_friends = line[3])

user_rdd = sc.textFile(file_name).map(processToRDD)
user_rdd.collect()


[['0', 'Alex', '30', '123'],
 ['1', 'Bert', '32', '234'],
 ['2', 'Curt', '28', '312'],
 ['3', 'Don', '32', '89']]

In [78]:
user_rows = user_rdd.map(processToDF)
user_rows.collect()

[Row(id=0, name='Alex', age=30, num_friends='123'),
 Row(id=1, name='Bert', age=32, num_friends='234'),
 Row(id=2, name='Curt', age=28, num_friends='312'),
 Row(id=3, name='Don', age=32, num_friends='89')]

In [79]:
# RDD -> DF
user_df = sqlContext.createDataFrame(user_rows)
user_df.show()

+---+----+---+-----------+
| id|name|age|num_friends|
+---+----+---+-----------+
|  0|Alex| 30|        123|
|  1|Bert| 32|        234|
|  2|Curt| 28|        312|
|  3| Don| 32|         89|
+---+----+---+-----------+



In [82]:
user_filter = user_df.select(user_df.id, user_df.age) \
    .filter(user_df.age > 30)

user_filter.show()

+---+---+
| id|age|
+---+---+
|  1| 32|
|  3| 32|
+---+---+



In [83]:
user_filter.groupBy("age").mean().collect()

[Row(age=32, avg(id)=2.0, avg(age)=32.0)]

In [85]:
user_filter.groupBy("age").agg({"age" : "max", "id": "max"}).show()

+---+-------+--------+
|age|max(id)|max(age)|
+---+-------+--------+
| 32|      3|      32|
+---+-------+--------+



In [87]:
# user 테이블을 sql 쓰겠다.
user_df.registerTempTable("user")

output = sqlContext.sql("SELECT * FROM user WHERE age > 30")
output.show()

+---+----+---+-----------+
| id|name|age|num_friends|
+---+----+---+-----------+
|  1|Bert| 32|        234|
|  3| Don| 32|         89|
+---+----+---+-----------+



In [91]:
# UDF(User Defined Function)

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit

maturity_udf = udf(lambda age: "adult" if age >= 30 else "child", StringType())

# marurity 컬럼을 추가할건데, 컬럼 조건은 maturity_udf에 있어
user_df.withColumn("maturity", maturity_udf(user_df.age)).show()


+---+----+---+-----------+--------+
| id|name|age|num_friends|maturity|
+---+----+---+-----------+--------+
|  0|Alex| 30|        123|   adult|
|  1|Bert| 32|        234|   adult|
|  2|Curt| 28|        312|   child|
|  3| Don| 32|         89|   adult|
+---+----+---+-----------+--------+



In [14]:
path = "/Users/jihyun/Desktop/lecture/bigdata_pipeline_master/spark_practice/spark_sample/"
file_name = path + "wb.json"

In [15]:
df = sqlContext.read.json(file_name)

In [16]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- approvalfy: string (nullable = true)
 |-- board_approval_month: string (nullable = true)
 |-- boardapprovaldate: string (nullable = true)
 |-- borrower: string (nullable = true)
 |-- closingdate: string (nullable = true)
 |-- country_namecode: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- countryname: string (nullable = true)
 |-- countryshortname: string (nullable = true)
 |-- docty: string (nullable = true)
 |-- envassesmentcategorycode: string (nullable = true)
 |-- grantamt: long (nullable = true)
 |-- ibrdcommamt: long (nullable = true)
 |-- id: string (nullable = true)
 |-- idacommamt: long (nullable = true)
 |-- impagency: string (nullable = true)
 |-- lendinginstr: string (nullable = true)
 |-- lendinginstrtype: string (nullable = true)
 |-- lendprojectcost: long (nullable = true)
 |-- majorsector_percent: array (nullable = true)
 |    |-- element: struct (containsNu

In [20]:
df_sigh = sqlContext.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("inferSchema", "true").load(file_name)


In [21]:
df_sigh

DataFrame[{ "_id" : { "$oid" : "52b213b38594d8a2be17c780" }: string,  "approvalfy" : 1999: string,  "board_approval_month" : "November": string,  "boardapprovaldate" : "2013-11-12T00:00:00Z": string,  "borrower" : "FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA": string,  "closingdate" : "2018-07-07T00:00:00Z": string,  "country_namecode" : "Federal Democratic Republic of Ethiopia!$!ET": string,  "countrycode" : "ET": string,  "countryname" : "Federal Democratic Republic of Ethiopia": string,  "countryshortname" : "Ethiopia": string,  "docty" : "Project Information Document: string, Indigenous Peoples Plan: string, Project Information Document": string,  "envassesmentcategorycode" : "C": string,  "grantamt" : 0: string,  "ibrdcommamt" : 0: string,  "id" : "P129828": string,  "idacommamt" : 130000000: string,  "impagency" : "MINISTRY OF EDUCATION": string,  "lendinginstr" : "Investment Project Financing": string,  "lendinginstrtype" : "IN": string,  "lendprojectcost" : 550000000: string,  "maj

In [36]:
# sql 쿼리를 날리겠다
df.registerTempTable("world_bank")

In [26]:
df_wb = sqlContext.sql("select _id, board_approval_month, boardapprovaldate from world_bank")
df_wb.show()

+--------------------+--------------------+--------------------+
|                 _id|board_approval_month|   boardapprovaldate|
+--------------------+--------------------+--------------------+
|[52b213b38594d8a2...|            November|2013-11-12T00:00:00Z|
|[52b213b38594d8a2...|            November|2013-11-04T00:00:00Z|
|[52b213b38594d8a2...|            November|2013-11-01T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-31T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-31T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-31T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-29T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-29T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-29T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-29T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-25T00:00:00Z|
|[52b213b38594d8a2...|             October|2013-10-25T00:00:00Z|
|[52b213b38594d8a2...|   

In [27]:
# 핀다스로 출력
df.toPandas()

Unnamed: 0,_id,approvalfy,board_approval_month,boardapprovaldate,borrower,closingdate,country_namecode,countrycode,countryname,countryshortname,...,sectorcode,source,status,supplementprojectflg,theme1,theme_namecode,themecode,totalamt,totalcommamt,url
0,"(52b213b38594d8a2be17c780,)",1999,November,2013-11-12T00:00:00Z,FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA,2018-07-07T00:00:00Z,Federal Democratic Republic of Ethiopia!$!ET,ET,Federal Democratic Republic of Ethiopia,Ethiopia,...,"ET,BS,ES,EP",IBRD,Active,N,"(Education for all, 100)","[(65, Education for all)]",65,130000000,130000000,http://www.worldbank.org/projects/P129828/ethi...
1,"(52b213b38594d8a2be17c781,)",2015,November,2013-11-04T00:00:00Z,GOVERNMENT OF TUNISIA,,Republic of Tunisia!$!TN,TN,Republic of Tunisia,Tunisia,...,"BZ,BS",IBRD,Active,N,"(Other economic management, 30)","[(24, Other economic management), (54, Social ...",5424,0,4700000,http://www.worldbank.org/projects/P144674?lang=en
2,"(52b213b38594d8a2be17c782,)",2014,November,2013-11-01T00:00:00Z,MINISTRY OF FINANCE AND ECONOMIC DEVEL,,Tuvalu!$!TV,TV,Tuvalu,Tuvalu,...,TI,IBRD,Active,Y,"(Regional integration, 46)","[(47, Regional integration), (25, Administrati...",52812547,6060000,6060000,http://www.worldbank.org/projects/P145310?lang=en
3,"(52b213b38594d8a2be17c783,)",2014,October,2013-10-31T00:00:00Z,MIN. OF PLANNING AND INT'L COOPERATION,,Republic of Yemen!$!RY,RY,Republic of Yemen,"Yemen, Republic of",...,JB,IBRD,Active,N,"(Participation and civic engagement, 50)","[(57, Participation and civic engagement), (59...",5957,0,1500000,http://www.worldbank.org/projects/P144665?lang=en
4,"(52b213b38594d8a2be17c784,)",2014,October,2013-10-31T00:00:00Z,MINISTRY OF FINANCE,2019-04-30T00:00:00Z,Kingdom of Lesotho!$!LS,LS,Kingdom of Lesotho,Lesotho,...,"FH,YW,YZ",IBRD,Active,N,"(Export development and competitiveness, 30)","[(45, Export development and competitiveness),...",4145,13100000,13100000,http://www.worldbank.org/projects/P144933/seco...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
495,"(52b213b38594d8a2be17c96f,)",2013,August,2012-08-10T00:00:00Z,THE COMPETITIVENESS COMPANY,2013-08-31T00:00:00Z,Jamaica!$!JM,JM,Jamaica,Jamaica,...,"EV,AZ",IBRD,Closed,N,"(Regulation and competition policy, 50)","[(40, Regulation and competition policy), (62,...",6240,0,50000,http://www.worldbank.org/projects/P127299/tech...
496,"(52b213b38594d8a2be17c970,)",2013,August,2012-08-09T00:00:00Z,LAO PEOPLES DEMOCRATIC REPUBLIC,2012-12-31T00:00:00Z,Lao People's Democratic Republic!$!LA,LA,Lao People's Democratic Republic,Lao People's Democratic Republic,...,"YZ,JA,EZ,FZ,BC",IBRD,Closed,N,"(Child health, 14)","[(63, Child health), (49, Trade facilitation a...",65274963,20000000,20000000,http://www.worldbank.org/projects/P125298/lao-...
497,"(52b213b38594d8a2be17c971,)",2013,August,2012-08-03T00:00:00Z,GOVERNMENT OF THE REPUBLIC OF GUINEA,2014-12-31T00:00:00Z,Republic of Guinea!$!GN,GN,Republic of Guinea,Guinea,...,"AB,AH,AI",IBRD,Active,N,"(Global food crisis response, 100)","[(91, Global food crisis response)]",91,0,20000000,http://www.worldbank.org/projects/P128309/seco...
498,"(52b213b38594d8a2be17c972,)",2013,August,2012-08-02T00:00:00Z,REPUBLIC OF INDONESIA,2017-09-30T00:00:00Z,Republic of Indonesia!$!ID,ID,Republic of Indonesia,Indonesia,...,"YA,BL,AB",IBRD,Active,N,"(Rural services and infrastructure, 85)","[(78, Rural services and infrastructure), (91,...",779178,80000000,80000000,http://www.worldbank.org/projects/P117243/sust...


In [28]:
import pandas as pd
df_wb.toPandas()[:5]

Unnamed: 0,_id,board_approval_month,boardapprovaldate
0,"(52b213b38594d8a2be17c780,)",November,2013-11-12T00:00:00Z
1,"(52b213b38594d8a2be17c781,)",November,2013-11-04T00:00:00Z
2,"(52b213b38594d8a2be17c782,)",November,2013-11-01T00:00:00Z
3,"(52b213b38594d8a2be17c783,)",October,2013-10-31T00:00:00Z
4,"(52b213b38594d8a2be17c784,)",October,2013-10-31T00:00:00Z


In [33]:
query = """
    select
        regionname,
        count(*) as project_count
    from worldBank
    group by regionname
    order by count(*) desc
"""

In [34]:
# worldBank 테이블에서 가져온 결과값을 판다스로
sqlContext.sql(query).toPandas()

Unnamed: 0,regionname,project_count
0,Africa,152
1,East Asia and Pacific,100
2,Europe and Central Asia,74
3,South Asia,65
4,Middle East and North Africa,54
5,Latin America and Caribbean,53
6,Other,2


In [51]:
data_list = [[1,9,11], [2,9,11], [3,7,5], [4,3,1], [5,8,10]]
rdd_data = sc.parallelize(data_list)
rdd_data.collect()

[[1, 9, 11], [2, 9, 11], [3, 7, 5], [4, 3, 1], [5, 8, 10]]

In [52]:
from pyspark.sql.types import *

# 스파크 데이터프레임 필드 만들기 1
schema_string = "ID V1 V2"
fields = [StructField(field_name, StringType(), True) for field_name in schema_string.split()]
schema = StructType(fields)

# 스파크 데이터프레임 생성
# createDataFrame(데이터, 스키마)
df_list = sqlContext.createDataFrame(data_list, schema)
df_list.show()

+---+---+---+
| ID| V1| V2|
+---+---+---+
|  1|  9| 11|
|  2|  9| 11|
|  3|  7|  5|
|  4|  3|  1|
|  5|  8| 10|
+---+---+---+



In [60]:
from pyspark.sql import Row

# 스파크 데이터프레임 필드 만들기 2
rdd_data_mapped = rdd_data.map(lambda x: Row(id=x[0], name=x[1], gender=x[2]))
rdd_data_mapped.collect()

[Row(id=1, name=9, gender=11),
 Row(id=2, name=9, gender=11),
 Row(id=3, name=7, gender=5),
 Row(id=4, name=3, gender=1),
 Row(id=5, name=8, gender=10)]

In [61]:
rdd_data_mapped.toDF().show()

+---+----+------+
| id|name|gender|
+---+----+------+
|  1|   9|    11|
|  2|   9|    11|
|  3|   7|     5|
|  4|   3|     1|
|  5|   8|    10|
+---+----+------+

