## Google Drive 연동

In [None]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Spark 설치

In [None]:
!apt-get install openjdk-8-jdk-headless
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -zxf spark-3.5.1-bin-hadoop3.tgz

import os
import findspark
import pyspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

!pip install findspark -q

findspark.init()

spark_version = pyspark.__version__
print("Apache Spark 버전 확인: " + spark_version)

## Spark
- RDD : 다수의 서버에 분산 방식으로 저장함

In [None]:
from pyspark.sql import SparkSession

# Spark 세션 활성화
my_spark = SparkSession.builder.getOrCreate()
my_spark

- 데이터베이스 확인

In [None]:
my_spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

- Spark SQL 쿼리 실행 (Database 보여주기)

In [None]:
my_spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [None]:
my_spark.catalog.currentDatabase()

'default'

In [None]:
# 기존 Spark 세션 종료
my_spark.stop()

In [None]:
# 새로운 Spark 세션 시작
my_spark = SparkSession.builder.master("local[1]").appName("SampleTutorial").getOrCreate()
rdd_sample = my_spark.sparkContext.parallelize([1, 2, 3, 4, 5])
print(type(rdd_sample))

<class 'pyspark.rdd.RDD'>


In [None]:
rdd_sample.take(num=2)

[1, 2]

## CSV 파일 불러오기

In [None]:
DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/data/flight_small.csv'
flights = my_spark.read.option('header', 'true').csv(DATA_PATH)
flights.show(2)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 2 rows



- flights 데이터프레임을 default 데이터베이스에 추가

In [None]:
flights.createOrReplaceTempView('flights')

- default 데이터베이스에 데이터가 추가가 되었는지 확인

In [None]:
my_spark.catalog.listTables('default')

[Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='temp', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

- 데이터 조회할 때, SQL을 통해서 조회가 가능하다.


In [None]:
my_spark.sql('SHOW TABLES FROM default').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |  flights|       true|
|         |     temp|       true|
+---------+---------+-----------+



In [18]:
query = 'SELECT * FROM flights LIMIT 10'
# my_spark.sql(query).show()

flights10 = my_spark.sql(query)
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

- origin, dest GroupBy 연산 데이터갯수 확인
- pandas 데이터프레임으로 변환 (메서드 찾아보기)
- (~10:25)

In [19]:
query = """
  SELECT origin, dest, COUNT(*) AS CNT
  FROM flights
  GROUP BY origin, dest
"""

result_spark = my_spark.sql(query)
# result.show()

# pandas 데이터프레임으로 변환
result_pd = result_spark.toPandas()
result_pd.head()

Unnamed: 0,origin,dest,CNT
0,SEA,RNO,8
1,SEA,DTW,98
2,SEA,CLE,2
3,SEA,LAX,450
4,PDX,SEA,144


In [20]:
# Pandas 데이터프레임에서 Spark로 변환
import pandas as pd
import numpy as np

# Generate a pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = my_spark.createDataFrame(pdf)
df.show()

+--------------------+-------------------+-------------------+
|                   0|                  1|                  2|
+--------------------+-------------------+-------------------+
| 0.27734755961853164| 0.3956112790786591|0.05169411004881819|
|  0.3372387974757679| 0.5017772610973991| 0.5476887903185194|
|  0.9846146619844656|   0.97052033763838| 0.8104730249627128|
|  0.2707138148851289| 0.3698957252856986| 0.3895376386703415|
|     0.5057316383234| 0.9975817529549396|0.21861919530041607|
|  0.6104085369687174| 0.2099185880684873| 0.5198351939057642|
|  0.8978474700673802|0.36523817192570507| 0.5748198919545531|
|  0.9202300527236342| 0.6546306869588676| 0.8146743989595295|
|  0.0527512456399315| 0.7006347404501069| 0.5475826460323931|
|  0.5277527124001207| 0.8561268418961051| 0.7135403132597933|
| 0.09976788304717799| 0.5111740448214537| 0.5918618049835165|
| 0.04438974367656401| 0.9386347058168836| 0.8480077131000381|
|  0.8407083705831392| 0.8688729591996885|0.45963404010

In [21]:
# my_spark.stop()

## 정리할 시간
- 첫번째 : Spark 세션 생성
- 두번째 : 임의의 pandas 데이터프레임 생성
- 세번째 : Spark 데이터프레임으로 변환
- 네번째 : 변환된 데이터프레임을 데이터베이스에 추가
- 마지막 : listTables() 확인

## Chapter 3장

In [22]:
# Databricks notebook source
# MAGIC
# MAGIC %md
# MAGIC # Example 3.7

# from pyspark.sql.types import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, expr, when, concat, lit
from pyspark.sql import SparkSession

# define schema for our data
schema = (StructType([
   StructField("Id", IntegerType(), False),
   StructField("First", StringType(), False),
   StructField("Last", StringType(), False),
   StructField("Url", StringType(), False),
   StructField("Published", StringType(), False),
   StructField("Hits", IntegerType(), False),
   StructField("Campaigns", ArrayType(StringType()), False)]))

ddl_schema = "`Id` INT,`First` STRING,`Last` STRING,`Url` STRING,`Published` STRING,`Hits` INT,`Campaigns` ARRAY<STRING>"

# create our data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

# create a DataFrame using the schema defined above
spark = SparkSession.builder.master("local[1]").appName("SampleTutorial").getOrCreate()

blogs_df = spark.createDataFrame(data, ddl_schema)
blogs_df.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [23]:
blogs_df2 = spark.createDataFrame(data, schema)
blogs_df2.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [24]:
blogs_df2.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- First: string (nullable = false)
 |-- Last: string (nullable = false)
 |-- Url: string (nullable = false)
 |-- Published: string (nullable = false)
 |-- Hits: integer (nullable = false)
 |-- Campaigns: array (nullable = false)
 |    |-- element: string (containsNull = true)



In [25]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = my_spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(my_spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")

# Examine the tables in the catalog again
print(my_spark.catalog.listTables())

[Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [26]:
# default DB에 테이블이 2개 존재
# 그 중에서 내가 원하는 테이블 취사 선택하는 예제
flights_2 = my_spark.table('flights')
flights_2.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

## Spark 문법 활용 데이터 가공

In [27]:
# 데이터 컬럼 추가
# result = flights_2.
# flights_2['새로운'] = flights_2['air_time'] / 60
flights_2 = flights_2.withColumn("duration_hrs", flights_2.air_time/60)
flights_2.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|         2.2|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 1 row



- 데이터를 필터링하는 코드 작성

In [28]:
result = flights_2.filter("distance >= 1000")
result.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 1 row



In [29]:
result2 = flights_2.filter(flights_2.distance > 1000)
result2.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 1 row



In [30]:
# 변수 선택, tailnum, origin, dest
result3 = flights_2.select("tailnum", "origin", "dest")
result3.show(1)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
+-------+------+----+
only showing top 1 row



In [31]:
result4 = flights_2.select(flights_2.tailnum, flights_2.origin, flights_2.dest)
result4.show(1)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
+-------+------+----+
only showing top 1 row



In [32]:
# 다중 필터 조건 걸어보기
filterA = result4.origin == "SEA"
filterB = result4.dest == "LAX"



In [33]:
# avg_speed
avg_speed = (flights_2.distance/(flights_2.air_time/60)).alias("avg_speed")
speed_df = flights.select("origin", "dest", "tailnum", avg_speed)
speed_df.show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

In [34]:
speed_df2 = flights.selectExpr("origin", 'dest', 'tailnum', 'distance/(air_time/60) AS avg_speed')
speed_df2.show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

## 집계 함수
- groupby()

In [35]:
flights_2.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [36]:
# distance String -> Int
flights_2 = flights_2.withColumn('distance', flights_2.distance.cast('int'))
flights_2.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [38]:
# origin, PDX 추출, groupby, distance 최소
flights_2.filter(flights_2.origin == "PDX").groupby('tailnum').min('distance').show()

+-------+-------------+
|tailnum|min(distance)|
+-------+-------------+
| N442AS|          569|
| N36472|          550|
| N567AA|         1616|
| N38451|         1739|
| N516UA|          991|
| N927DN|         1426|
| N954WN|          543|
| N73283|          550|
| N102UW|         2282|
| N607AS|          550|
| N622SW|          569|
| N584AS|          933|
| N914WN|          991|
| N445WN|          543|
| N3LDAA|         1739|
| N389HA|         2603|
| N578SW|          106|
| N430WN|          479|
| N651SW|         1009|
| N611SW|          763|
+-------+-------------+
only showing top 20 rows



In [41]:
# carrier
flights_2.filter(flights_2.origin == "PDX").filter(flights_2.carrier == "DL").groupby("dest").avg("distance").show()

+----+-------------+
|dest|avg(distance)|
+----+-------------+
| ATL|       2172.0|
| DTW|       1953.0|
| MSP|       1426.0|
| JFK|       2454.0|
| SLC|        630.0|
+----+-------------+



In [45]:
import pyspark.sql.functions as F

flights_2= flights_2.withColumn('dep_delay', flights_2.distance.cast('int'))
by_month_dest = flights_2.groupBy("month", "dest")
by_month_dest.avg("dep_delay").show()

+-----+----+------------------+
|month|dest|    avg(dep_delay)|
+-----+----+------------------+
|   11| TUS|1183.6666666666667|
|   11| ANC|1453.5294117647059|
|    1| BUR|             877.0|
|    1| PDX|             129.0|
|    6| SBA|             877.0|
|    5| LAX| 919.7142857142857|
|   10| DTW|            1927.0|
|    6| SIT|             861.0|
|   10| DFW|1647.0588235294117|
|    3| FAI|            1533.0|
|   10| SEA|             129.0|
|    2| TUS|            1119.0|
|   12| OGG| 2625.818181818182|
|    9| DFW|1645.3333333333333|
|    5| EWR|2404.6666666666665|
|    3| RDM|             116.0|
|    8| DCA|            2335.3|
|    7| ATL|2178.2162162162163|
|    4| JFK|2431.1428571428573|
|   10| SNA| 962.1333333333333|
+-----+----+------------------+
only showing top 20 rows



In [46]:
by_month_dest.agg(F.stddev("dep_delay")).show()

+-----+----+------------------+
|month|dest| stddev(dep_delay)|
+-----+----+------------------+
|   11| TUS|  56.0029761113937|
|   11| ANC|22.450261919397587|
|    1| BUR| 61.55870112510924|
|    1| PDX|               0.0|
|    6| SBA| 62.00000000000001|
|    5| LAX| 54.56595650240686|
|   10| DTW|               0.0|
|    6| SIT|              NULL|
|   10| DFW|20.349880762766734|
|    3| FAI|               0.0|
|   10| SEA|               0.0|
|    2| TUS|               0.0|
|   12| OGG|31.552553563279734|
|    9| DFW|21.096385265356872|
|    5| EWR| 9.237604307033997|
|    3| RDM|               0.0|
|    8| DCA|10.143963722332614|
|    7| ATL| 4.916723926983699|
|    4| JFK|15.001831390031716|
|   10| SNA| 41.87202725766531|
+-----+----+------------------+
only showing top 20 rows



테이블 조인

In [47]:
my_spark.stop()

In [50]:
from pyspark.sql import SparkSession
my_spark = SparkSession.builder.master("local[1]").appName("SampleTutorial").getOrCreate()
DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/data/'
airports = my_spark.read.csv(DATA_PATH + "airports.csv", header = True)
flights = my_spark.read.csv(DATA_PATH + "flight_small.csv", header = True)
planes = my_spark.read.csv(DATA_PATH + "planes.csv", header = True)

airports.show(1)
flights.show(1)
planes.show(1)

+---+-----------------+----------+-----------+----+---+---+
|faa|             name|       lat|        lon| alt| tz|dst|
+---+-----------------+----------+-----------+----+---+---+
|04G|Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
+---+-----------------+----------+-----------+----+---+---+
only showing top 1 row

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 1 row

+-------+----+--------------------+----------------+

In [52]:
# 컬럼명 변경
airports = airports.withColumnRenamed("faa", "dest")

In [53]:
result = flights.join(airports, on = 'dest', how = 'leftouter')
result.show()

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+----+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
| SFO|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA|     111|     679|  14|    43| 

In [55]:
my_spark.stop()

## zip 파일 데이터 불러오기

In [56]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("Sample").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7af7ac4a08b0>


In [57]:
# CSV 파일 불러오기
DATA_PATH = "/content/drive/MyDrive/Colab Notebooks/data/"
aa_dfw_2014 = spark.read.format("csv").options(Header=True).load(DATA_PATH + 'AA_DFW_2014_Departures_Short.csv.gz')
aa_dfw_2014.show(1)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2014|         0005|                HNL|                          519|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 1 row



In [58]:
import pyspark.sql.functions as F

aa_dfw_2014 = aa_dfw_2014.withColumn('airport', F.lower(aa_dfw_2014['Destination Airport']))
aa_dfw_2014.show(2)

+-----------------+-------------+-------------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-------------------+-----------------------------+-------+
|       01/01/2014|         0005|                HNL|                          519|    hnl|
|       01/01/2014|         0007|                OGG|                          505|    ogg|
+-----------------+-------------+-------------------+-----------------------------+-------+
only showing top 2 rows



In [59]:
# 특정 컬럼 삭제
aa_dfw_2014 = aa_dfw_2014.drop(aa_dfw_2014['Destination Airport'])
aa_dfw_2014.show(1)

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2014|         0005|                          519|    hnl|
+-----------------+-------------+-----------------------------+-------+
only showing top 1 row



In [60]:
# .parquet
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import pyarrow as pa

df = pd.DataFrame({'one': [-1, 100, 2.5],
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]}, index=list('abc'))

table = pa.Table.from_pandas(df)
pq.write_table(table, DATA_PATH + 'example.parquet')


- 데이터 불러오기

In [61]:
example_df = spark.read.parquet(DATA_PATH + 'example.parquet')
example_df.show(1)

+----+---+-----+-----------------+
| one|two|three|__index_level_0__|
+----+---+-----+-----------------+
|-1.0|foo| true|                a|
+----+---+-----+-----------------+
only showing top 1 row



In [62]:
voter_df = spark.read.format('csv').options(Header=True).load(DATA_PATH + 'DallasCouncilVoters.csv.gz')
voter_df.show()

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

In [64]:
# 중복값 제거
voter_df.select(voter_df['VOTER_NAME']).distinct().show(10)

+--------------------+
|          VOTER_NAME|
+--------------------+
|      Tennell Atkins|
|  the  final   20...|
|        Scott Griggs|
|       Scott  Griggs|
|       Sandy Greyson|
| Michael S. Rawlings|
| the final 2018 A...|
|        Kevin Felder|
|        Adam Medrano|
|       Casey  Thomas|
+--------------------+
only showing top 10 rows



### 문제
- 객체명 : result
- 메서드 filter() 사용
- length()를 활용해서 0보다 크고, 20보다 작은 VOTER_NAME 가져온다.

In [71]:
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
voter_df.select('VOTER_NAME').distinct().show(10, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|011018__42         |
|Mark  Clayton      |
+-------------------+
only showing top 10 rows



밑줄이 그어진 행 '011018__42'는 제거를 하자.

In [74]:
voter_df = voter_df.filter(~F.col('VOTER_NAME').contains('_'))
voter_df.select('VOTER_NAME').distinct().show(10, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|Mark  Clayton      |
|Casey Thomas       |
+-------------------+
only showing top 10 rows

