In [1]:
import subprocess
import os
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pandas as pd

# select files in '/example' in hdfs

cmd = "hdfs dfs -ls /example"
files = subprocess.check_output(cmd, shell=True).decode("utf-8")
print("* files in '/example' in hdfs :", "\n")
print(files)

# define SparkSession

sc = SparkContext()
spark = SparkSession.builder.master("yarn").appName("practice").getOrCreate()
print("* SparkContext :")
display(sc)
print("* SparkSession :")
display(spark)

* files in '/example' in hdfs : 

Found 7 items
-rw-r--r--   1 hadoop supergroup  234052199 2020-10-26 23:46 /example/2008.csv
-rw-r--r--   1 hadoop supergroup      23312 2020-10-19 19:17 /example/LICENSE
-rw-r--r--   1 hadoop supergroup       4488 2021-10-17 13:02 /example/README.md
-rw-r--r--   1 hadoop supergroup       1366 2020-10-18 23:18 /example/README.txt
-rw-r--r--   1 hadoop supergroup      11411 2022-08-20 10:24 /example/airport-codes-na.txt
-rw-r--r--   1 hadoop supergroup   33396236 2022-08-20 10:23 /example/departuredelays.csv
-rw-r--r--   1 hadoop supergroup   87573298 2020-10-27 11:34 /example/videodata.json

* SparkContext :


* SparkSession :


In [2]:
# read video data (json) and define view for video data

json_of_video_history  = spark.read.json("/example/videodata.json")
view_of_video_history = json_of_video_history.createOrReplaceTempView("video_history")

# print schema of video_history

print("* schema of video_history :", "\n")
json_of_video_history.printSchema()

# print count of video_history

count_of_video_history = spark.sql("""

                                                                SELECT COUNT(*) AS CNT
                                                                FROM   video_history

                                                        """)

print("* count of video_history :")
count_of_video_history = count_of_video_history.toPandas()
display(count_of_video_history)
print("\n")

# print count of open_and_finish_incomplete_by_show

count_of_open_and_finish_incomplete_by_show = spark.sql("""
                                                                
                                                                SELECT  A.SHOW_ID,
                                                                              A.CNT_OPEN,
                                                                              B.CNT_INCOMPLETE
                                                                FROM   (
                                                                               SELECT       SHOW_ID,
                                                                                                  COUNT(1) AS CNT_OPEN
                                                                               FROM         video_history
                                                                               WHERE       STATE = 'open'
                                                                               GROUP BY  SHOW_ID
                                                                ) A
                                                                LEFT OUTER JOIN (
                                                                               SELECT       SHOW_ID,
                                                                                                  COUNT(1) AS CNT_INCOMPLETE
                                                                               FROM         video_history
                                                                               WHERE       STATE = 'finish_incomplete'
                                                                               GROUP BY  SHOW_ID
                                                                ) B
                                                                ON A.SHOW_ID = B.SHOW_ID

                                                        """)

print("* count of open_and_finish_incomplete_by_show :")
count_of_open_and_finish_incomplete_by_show = count_of_open_and_finish_incomplete_by_show.toPandas()
display(count_of_open_and_finish_incomplete_by_show)

* schema of video_history : 

root
 |-- customer_id: long (nullable = true)
 |-- show_id: long (nullable = true)
 |-- state: string (nullable = true)
 |-- timestamp: long (nullable = true)

* count of video_history :


Unnamed: 0,CNT
0,1166450




* count of open_and_finish_incomplete_by_show :


Unnamed: 0,SHOW_ID,CNT_OPEN,CNT_INCOMPLETE
0,26,635,177
1,29,676,158
2,474,647,186
3,964,666,166
4,65,694,190
...,...,...,...
995,458,651,178
996,739,705,150
997,211,671,160
998,469,642,162


In [6]:
# read departure delays of airplane(csv) and define view for departure_delays

csv_of_departure_delays   = spark.read.csv("/example/departuredelays.csv", header = True)
view_of_departure_delays = csv_of_departure_delays.createOrReplaceTempView("departure_delays")

# print schema of departure_delays

print("* schema of departure_delays :", "\n")
csv_of_departure_delays.printSchema()

# change delay column's type from string to int

csv_of_departure_delays = csv_of_departure_delays.withColumn("delay",csv_of_departure_delays["delay"].cast(pyspark.sql.types.IntegerType()))
print("* schema of departure_delays after changing delay column's type :", "\n")
csv_of_departure_delays.printSchema()

# print count of departure_delays

count_of_departure_delays = spark.sql("""

                                                                    SELECT COUNT(*) AS CNT
                                                                    FROM   departure_delays

                                                              """)

print("* count of departure_delays :")
count_of_departure_delays = count_of_departure_delays.toPandas()
display(count_of_departure_delays)
print("\n")

# print top_10_of_delay

top_10_of_delay = spark.sql("""

                                                       SELECT X.*
                                                       FROM (        
                                                            SELECT  *,
                                                                          ROW_NUMBER() OVER(ORDER BY DELAY DESC) AS RNK
                                                            FROM   departure_delays
                                                       ) X
                                                       WHERE X.RNK <= 10

                                                """)
top_10_of_delay = top_10_of_delay.toPandas()
print("* top_10_of_delay :")
display(top_10_of_delay)

# insert top_10_of_delay in hdfs
top_10_of_delay.to_csv("top_10_of_delay.csv", index = False)
cmd = "hadoop fs -copyFromLocal top_10_of_delay.csv /result_of_spark"
files = subprocess.check_output(cmd, shell=True)

* schema of departure_delays : 

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)

* schema of departure_delays after changing delay column's type : 

root
 |-- date: string (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)

* count of departure_delays :


Unnamed: 0,CNT
0,1391578




* top_10_of_delay :


Unnamed: 0,date,delay,distance,origin,destination,RNK
0,1090600,995,462,SMF,SLC,1
1,3191420,994,1590,SJC,ORD,2
2,1200645,993,525,MOT,DEN,3
3,1201749,99,741,ALB,ATL,4
4,1061635,99,516,ATL,DTW,5
5,1061518,99,1042,ATL,DEN,6
6,1062008,99,586,ATL,TUL,7
7,1071455,99,660,ATL,JFK,8
8,1071355,99,788,ATL,MSP,9
9,1072140,99,648,ATL,EWR,10
