# 10강. Spark SQL DataFrames

In [1]:
import findspark
findspark.init('/opt/homebrew/Cellar/apache-spark/3.5.1/libexec')
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("SimpleTest").getOrCreate()
df = spark.sql('''select 'spark' as hello ''')
df.show()
#spark.stop()

24/04/03 21:11:54 WARN Utils: Your hostname, itaehun-ui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 172.24.96.75 instead (on interface en0)
24/04/03 21:11:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/03 21:11:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-----+
|hello|
+-----+
|spark|
+-----+



##### 1) structField와 structType을 이용하여 schema를 부여하고 Dataframe을 만들어 보자

<div class="alert alert-warning"/>
- spark.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)<br>
 + data를 dataFrame으로 생성한다 schema가 정의되어 있을때 verifySchema가 True면 invalid 할 시 에러를 냄<br>
 + pyspark.sql.types 아래의 StructField로 field의 타입 이름, null 값여부를 지정하고<br>
 + 각 field들을 StructType로 묶어 schema를 만듬<br><br>
- df.show(n=20, truncate=True, vertical=False)<br>
 + df의 데이터를 주어진 n개만큼 출력한다 truncate가 True일 경우 column이 너무 많을 경우 자른다.<br>
 + vertical이 True일 경우 column의 출력형태를 변경함<br><br>
- df.printSchema()<br>
 + df의 schema 형태를 출력함<br><br>
- df.describe(*cols)<br>
 + 주어진 cols이 숫자형 컬럼이라면 기본통계를 추출함<br>

In [3]:
r1 = ['Son', 34, 'Korea']
r2 = ['Kim', 24, 'Japan']
r3 = ['Park', 14, 'China']

from pyspark.sql  import types
sf1 = types.StructField('name', types.StringType(), True)
sf2 = types.StructField('age', types.IntegerType(), True)
sf3 = types.StructField('country', types.StringType(), True)

schema = types.StructType([sf1, sf2, sf3])
rows = [r1, r2, r3]

df_first = spark.createDataFrame(rows, schema)
df_first.show()

+----+---+-------+
|name|age|country|
+----+---+-------+
| Son| 34|  Korea|
| Kim| 24|  Japan|
|Park| 14|  China|
+----+---+-------+



In [4]:
df_first.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- country: string (nullable = true)



In [5]:
df_first.describe().show()

+-------+----+----+-------+
|summary|name| age|country|
+-------+----+----+-------+
|  count|   3|   3|      3|
|   mean|NULL|24.0|   NULL|
| stddev|NULL|10.0|   NULL|
|    min| Kim|  14|  China|
|    max| Son|  34|  Korea|
+-------+----+----+-------+



<div class="alert alert-warning"/>
- rdd와 마찬가지로 take(n),  count(), collect()같은 기능과 처음 record를 볼 수 있는 head(), first() 등이 존재<br>
- df는 pyspark.sql.types.Row로 구성되어져 있음<br>
- row는 field name을 이용하여 각 field에 값에 접근 할 수 있음<br>

In [6]:
row = df_first.head()
row

Row(name='Son', age=34, country='Korea')

In [7]:
type(row)

pyspark.sql.types.Row

In [8]:
row['age'] 
# 딕셔너리 처럼 사용할 수는 있지만 할당할 수는 없음 row['age'] = 100 X

34

##### 2) word count를 Spark SQL - Dataframe을 이용하여 만들어 보자

In [9]:
rdd = spark.sparkContext.textFile('./data/wiki_wordcount.txt')
rdd.take(10)

['',
 'Word count',
 'From Wikipedia, the free encyclopedia',
 'Jump to navigationJump to search',
 'The word count is the number of words in a document or passage of text. Word counting may be needed when a text is required to stay within certain numbers of words. This may particularly be the case in academia, legal proceedings, journalism and advertising. Word count is commonly used by translators to determine the price for the translation job. Word counts may also be used to calculate measures of readability and to measure typing and reading speeds (usually in words per minute). When converting character counts to words, a measure of 5 or 6 characters to a word is generally used for English.[1]',
 '',
 '',
 'Contents',
 '1\tDetails and variations of definition',
 '2\tSoftware']

<div class="alert alert-warning"/>
- rdd.toDF(schema=None, sampleRatio=None)<br>
 + rdd는 toDF를 통하여 dataframe으로 간단히 변경 시킬 수 있음
 + schema나 sampleRatio를 이용하여 변경될 때 데이터 형식, 컬럼 이름, 변경 비율등을 설정 가능<br>
- df.groupBy(*cols)<br>
 + 주어진 컬럼 형태에 따라 groupby를 실행함. df는 reduce가 없기 때문에 groupBy + agg 연산을 결합하는 것이 일반적<br>
- df.agg(*cols)<br>
 + 주어진 groupedData에 agg용 function을 수행함.
 + from pyspark.sql import functions 참고

In [10]:
# rdd를 사용하다가 DataFrame으로 변환하고 다시 rdd를 사용해도 무방함
df = rdd.flatMap(lambda x:x.split()).map(lambda x:(x,1)).toDF(['word', 'cnt'])
df.show()

+--------------+---+
|          word|cnt|
+--------------+---+
|          Word|  1|
|         count|  1|
|          From|  1|
|    Wikipedia,|  1|
|           the|  1|
|          free|  1|
|  encyclopedia|  1|
|          Jump|  1|
|            to|  1|
|navigationJump|  1|
|            to|  1|
|        search|  1|
|           The|  1|
|          word|  1|
|         count|  1|
|            is|  1|
|           the|  1|
|        number|  1|
|            of|  1|
|         words|  1|
+--------------+---+
only showing top 20 rows



In [11]:
df.describe().show()

+-------+-----------------+----+
|summary|             word| cnt|
+-------+-----------------+----+
|  count|             1276|1276|
|   mean|            508.3| 1.0|
| stddev|889.1919513332256| 0.0|
|    min|          "Agents|   1|
|    max|            فارسی|   1|
+-------+-----------------+----+



In [12]:
df.printSchema()

root
 |-- word: string (nullable = true)
 |-- cnt: long (nullable = true)



In [13]:
df.groupBy('word').count().orderBy(["count", "word"], ascending=[0, 1]).show() # 0: 내림차순, 1: 오름차순

+-----+-----+
| word|count|
+-----+-----+
|  the|   50|
|   of|   40|
|    a|   36|
|  and|   27|
|   to|   26|
|   is|   22|
| word|   22|
|   in|   14|
|   on|   14|
|words|   14|
|  for|   12|
|  The|   10|
| Word|   10|
|   as|   10|
|count|    9|
|  may|    9|
|   or|    9|
|   be|    8|
| also|    6|
|   by|    6|
+-----+-----+
only showing top 20 rows



In [16]:
from pyspark.sql import functions as func
from pyspark.sql import Column
df.groupBy('word').agg(func.count(df.cnt).alias('count2')).orderBy(["count2", "word"], ascending=[0, 1]).show()

+-----+------+
| word|count2|
+-----+------+
|  the|    50|
|   of|    40|
|    a|    36|
|  and|    27|
|   to|    26|
|   is|    22|
| word|    22|
|   in|    14|
|   on|    14|
|words|    14|
|  for|    12|
|  The|    10|
| Word|    10|
|   as|    10|
|count|     9|
|  may|     9|
|   or|     9|
|   be|     8|
| also|     6|
|   by|     6|
+-----+------+
only showing top 20 rows



##### 3) DataFrameReader/DataFrameWriter를 이용하여 입출력을 수행하여보자
- 외부 데이터를 df로 읽거나 쓰기 위해서 read/write 메서드를 지원함
- read 순서
 + read 메서드 호출 -> DataFrameReader 인스턴스 생성
 + format 메서드를 이용하여 read되는 데이터 소스 유형을 지정 (text, csv, json, jdbc, kafka,parquet, console, socket 등)
 + option format 별 상세 옵션 지정
 + load 메서드로 실제 df를 생성

In [15]:
df_wordcnt = spark.read.format('text').load('./data/wiki_wordcount.txt')
df_wordcnt.show(5)

+--------------------+
|               value|
+--------------------+
|                    |
|          Word count|
|From Wikipedia, t...|
|Jump to navigatio...|
|The word count is...|
+--------------------+
only showing top 5 rows



In [17]:
# column명을 변경
df_wordcnt2 = df_wordcnt.withColumnRenamed("value", "text")
df_wordcnt2.show(5)

+--------------------+
|                text|
+--------------------+
|                    |
|          Word count|
|From Wikipedia, t...|
|Jump to navigatio...|
|The word count is...|
+--------------------+
only showing top 5 rows



In [18]:
df_wordcnt3 = df_wordcnt2.withColumnRenamed("text", "a")
df_wordcnt3.show(5)

+--------------------+
|                   a|
+--------------------+
|                    |
|          Word count|
|From Wikipedia, t...|
|Jump to navigatio...|
|The word count is...|
+--------------------+
only showing top 5 rows



In [19]:
df_first.show()

+----+---+-------+
|name|age|country|
+----+---+-------+
| Son| 34|  Korea|
| Kim| 24|  Japan|
|Park| 14|  China|
+----+---+-------+



- write 순서
 + write 메서드 호출 -> DataFrameWriter 인스턴스 생성
 + format 메서드를 이용하여 read되는 데이터 소스 유형을 지정 (text, csv, json, jdbc, kafka,parquet, console, socket 등)
 + option format 별 상세 옵션 지정
 + save 메서드로 실제 파일을 생성

In [20]:
df_first.write.format('csv').mode("overwrite").save('./data/df_first.csv')
df_first.write.format('json').mode("overwrite").save('./data/df_first.json')

<div class="alert alert-warning"/>
- spark.createDataFrame(*cols)<br>
 + 해당 메서드를 이용하여 pandas dataframe을 바로 spark dataframe으로 변경하는 것도 가능

In [24]:
import pandas as pd
df_pd = pd.DataFrame([("foo", 1), ("bar", 2)], columns=("k", "v"))
sdf = spark.createDataFrame(df_pd)
sdf.show()
df_pd

+---+---+
|  k|  v|
+---+---+
|foo|  1|
|bar|  2|
+---+---+



Unnamed: 0,k,v
0,foo,1
1,bar,2


##### 4) DataFrame의 비타입 연산(untyped operation)(API)을 이용한 word count 
- dataFrame의 연산을 일반적으로 비타입연산이라고 부름
- 비타입연산은 각각의 row에 대하여 다시 지정된 col의 형태로 로직이 처리되는 형태를 보임

<div class="alert alert-warning"/>
- df.select(*cols)<br>
 + df에서 지정된 cols를 반환함<br>
 + 비타입연산 func을 이용하여 col의 value를 변환시킬 수 있음<br>
 + col은 문자열이외에도 pyspark.sql.functions.col을 이용하여 반환 될 수 있음<br>
 + pyspark.sql.functions.col은 pyspark.sql.Column을 생성함.<br>
 + 해당 인스턴스는 실제 col에 대한 정보를 변경 할 수 있음<br>
- functions.split(str, sep)<br>
 + 주어진 문자열을 주어진 sep으로 분리하여 배열 형태로 저장
- explode(col)<br>
 + 주어진 컬럼안의 배열을 각 row로 분리하여 저장

In [25]:
a = df_first.select(['age', 'country'])
a.withColumnRenamed('country', 'cty')

DataFrame[age: int, cty: string]

In [26]:
a.show()

+---+-------+
|age|country|
+---+-------+
| 34|  Korea|
| 24|  Japan|
| 14|  China|
+---+-------+



In [27]:
from pyspark.sql import Column
df_first.select([func.col('country'), func.col('age')]).show(5)

+-------+---+
|country|age|
+-------+---+
|  Korea| 34|
|  Japan| 24|
|  China| 14|
+-------+---+



In [28]:
from pyspark.sql import Column
df_first.select([func.col('country').alias('cty'), func.col('age')]).show(5)

+-----+---+
|  cty|age|
+-----+---+
|Korea| 34|
|Japan| 24|
|China| 14|
+-----+---+



In [30]:
df = spark.createDataFrame([['a b c d e f']])
df.show()
df.select(func.split(func.col('_1'), ' ').alias('word')).show()

+-----------+
|         _1|
+-----------+
|a b c d e f|
+-----------+

+------------------+
|              word|
+------------------+
|[a, b, c, d, e, f]|
+------------------+



In [31]:
spark.createDataFrame([['a b c d e f']]).show()

+-----------+
|         _1|
+-----------+
|a b c d e f|
+-----------+



In [32]:
# explode : 행을 열로 변환
df = spark.createDataFrame([['a b c d e f']])
df.select(func.explode(func.split(func.col('_1'), ' '))).show()

+---+
|col|
+---+
|  a|
|  b|
|  c|
|  d|
|  e|
|  f|
+---+



In [33]:
df.rdd.collect() # 비싼 연산은 아님

[Row(_1='a b c d e f')]

### df_wordcnt 를 이용하여 word count를 Dataframe 명령어로 구현해보자
- 정렬은 count 내림 차순 동일한 count를 가진 경우 word 오름차순으로 한다

In [34]:
df1 = df_wordcnt.select(func.explode(func.split(func.col('value'), ' ')).alias('word'))
df2 = df1.groupBy('word').count().orderBy(['count', 'word'], ascending=[0, 1])
df2.show(5)

+----+-----+
|word|count|
+----+-----+
| the|   50|
|  of|   40|
|   a|   36|
| and|   27|
|  to|   26|
+----+-----+
only showing top 5 rows



##### 5) Spark SQL을 이용하여 dataframe을 sql을 이용하여 컨트롤 해보자 (아주 중요한 개념)
- df는 바로 sql문을 이용하여 처리 될 수 없음
- df.createOrReplaceTempView라는 메서드를 통하여 임시의 테이블로 등록된 후 sql로 처리 될 수 있음

<div class="alert alert-warning"/>
- df.createOrReplaceTempViewe(name)<br>
 + df를 주어진 name의 임시 테이블로 생성<br>
 + 해당 테이블은 해당 메서드를 실행한 스파크 세션이 유지되는 동안만 유효함<br>
- df.createOrReplaceGlobalTempView(name) <- 다른 사람과의 충돌이 일어날 수 있기 떄문에 잘 사용하지는 않음<br>
 + df를 주어진 name의 글로벌 테이블로 생성<br>
 + 해당 테이블은 전역적인 스파크 세션이 유지되는 동안 유효함<br>
- explain()
 + spark.sql의 실행 계획을 출력함<br>

In [35]:
df2.createOrReplaceTempView('wordcnt') # wordcnt라는 이름으로 임시 테이블 생성

In [36]:
df2.show()

+-----+-----+
| word|count|
+-----+-----+
|  the|   50|
|   of|   40|
|    a|   36|
|  and|   27|
|   to|   26|
|     |   25|
|   is|   22|
| word|   22|
|   in|   14|
|   on|   14|
|words|   14|
|  for|   12|
|  The|   10|
|   as|   10|
| Word|    9|
|count|    9|
|  may|    9|
|   or|    9|
|   be|    8|
| also|    6|
+-----+-----+
only showing top 20 rows



In [37]:
# 우리가 아는 일반적인 SQL문을 사용할 수 있음
spark.sql('select * from wordcnt').show()

+-----+-----+
| word|count|
+-----+-----+
|  the|   50|
|   of|   40|
|    a|   36|
|  and|   27|
|   to|   26|
|     |   25|
|   is|   22|
| word|   22|
|   in|   14|
|   on|   14|
|words|   14|
|  for|   12|
|  The|   10|
|   as|   10|
| Word|    9|
|count|    9|
|  may|    9|
|   or|    9|
|   be|    8|
| also|    6|
+-----+-----+
only showing top 20 rows



In [38]:
spark.sql('select * from wordcnt where word == "words"').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#740L DESC NULLS LAST, word#736 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#740L DESC NULLS LAST, word#736 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=689]
      +- HashAggregate(keys=[word#736], functions=[count(1)])
         +- Exchange hashpartitioning(word#736, 200), ENSURE_REQUIREMENTS, [plan_id=686]
            +- HashAggregate(keys=[word#736], functions=[partial_count(1)])
               +- Filter (word#736 = words)
                  +- Generate explode(split(value#558,  , -1)), false, [word#736]
                     +- FileScan text [value#558] Batched: false, DataFilters: [], Format: Text, Location: InMemoryFileIndex(1 paths)[file:/Users/itaehun/python/빅실무/data/wiki_wordcount.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>




In [39]:
spark.sql('select * from wordcnt where word == "words"').show()

+-----+-----+
| word|count|
+-----+-----+
|words|   14|
+-----+-----+



##### 6) udf (user define function) 을 이용하여 custom function을 수행해보자

<div class="alert alert-warning"/>
- pyspark.sql.functions.udf(f=None, returnType=StringType) <- returnType=StringType을 꼭 지정 시켜야함<br>
 + f에서 수행될 function, returnType에서 pyspark.sql.types의 타입 중 수행후 반환될 타입을 지정히여 udf를 생성할 수 있음<br>
 + 해당 udf는 dataframe에서는 바로 사용 될 수 있지만, spark.sql 에서는 사용될 수 없음
- spark.udf.register(name, f=None, returnType=StringType)<br>
 + 해당 등록 과정을 통하여 spark.sql안에서도 udf를 사용할 수 있음, 지정된 name으로 지정한 f가 수행 됨
- df.withColumn(colName, col)<br>
 + 지정된 colName의 값들을 col에 지정된 형태로 변경함<br>
 + colName이 신규이면 새로운 col을 생성함<br>
- Column.cast(type)<br>
 + 주어진 컬럼의 데이터 타입을 type으로 변경함

In [40]:
array_sum = func.udf(lambda arr: sum(arr), types.IntegerType())

In [41]:
df = spark.createDataFrame([['1,2,3,4,5,6']])
df = df.select(func.split('_1', ',').alias('val'))
df = df.withColumn("val", df.val.cast("array<integer>")) # cast : 데이터 타입 변경
df.createOrReplaceTempView('test')
df.show()
df.printSchema()

+------------------+
|               val|
+------------------+
|[1, 2, 3, 4, 5, 6]|
+------------------+

root
 |-- val: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [42]:
spark.udf.register('array_sum', array_sum)

<pyspark.sql.udf.UserDefinedFunction at 0x144d2fb20>

In [43]:
spark.sql('select *, array_sum(val) from test').show()

+------------------+--------------+
|               val|array_sum(val)|
+------------------+--------------+
|[1, 2, 3, 4, 5, 6]|            21|
+------------------+--------------+



In [44]:
spark.sql('select *, array_sum(val) as sum_val from test').show()

+------------------+-------+
|               val|sum_val|
+------------------+-------+
|[1, 2, 3, 4, 5, 6]|     21|
+------------------+-------+



## 그외 Spark SQL 명령어는 너무나도 많기 때문에 다음 주소를 참조

https://spark.apache.org/docs/latest/sql-programming-guide.html