In [0]:
import pandas as pd

In [0]:

# read using panda and convert into spark dataframe
for year in range(2010,2016):
  url = f"https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/{year}-summary.csv"
  if year > 2010:
    df = pd.concat([df, pd.read_csv(url)], axis=0)
  else:
    df = pd.read_csv(url)

flight_df = spark.createDataFrame(df)

In [0]:
flight_df.count(), df.shape

In [0]:
# reading using spark itself
from pyspark import SparkFiles
def read_spark_csv(url,year):
  csvname = f"{year}-summary.csv"
  filename = "file://"+SparkFiles.get(csvname)
  return spark.read.csv(filename,header=True,inferSchema=True)


for year in range(2010,2016):
  url = f"https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/{year}-summary.csv"
  spark.sparkContext.addFile(url)
  if year > 2010:
    flight_df = flight_df.union(read_spark_csv(url,year))
  else:
    flight_df = read_spark_csv(url,year)



In [0]:
flight_df.count(), len(flight_df.columns)

In [0]:
# we can also read csv files like :
# flightData2015 = spark\
# .read\
# .option("inferSchema", "true")\
# .option("header", "true")\
# .csv("/data/flight-data/csv/2015-summary.csv")

In [0]:
flight_df.take(3)

In [0]:
flight_df.head(3)

In [0]:
flight_df.printSchema()

In [0]:
flight_df.sort('count').explain()

In [0]:
spark.conf.set('spark.sql.shuffle.partitions','5')

In [0]:
output_df = flight_df.sort('count')

In [0]:
output_df.rdd.getNumPartitions()

In [0]:
flight_df.sort('count').explain()

In [0]:
flight_df.createOrReplaceTempView('flight_table')

In [0]:
destination_in_edges = spark.sql("""

      select DEST_COUNTRY_NAME, count(1)
      from flight_table
      group by DEST_COUNTRY_NAME;



""")

In [0]:
destination_in_edges.show(3)

In [0]:
destination_df = flight_df.groupBy("DEST_COUNTRY_NAME").count()

In [0]:
destination_df.show(3)

In [0]:
flight_df.groupBy("DEST_COUNTRY_NAME").count().explain()

In [0]:
spark.sql("""

      select DEST_COUNTRY_NAME, count(1)
      from flight_table
      group by DEST_COUNTRY_NAME;



""").explain()

In [0]:
#  both of them has same execution plan
flight_df.printSchema()

In [0]:
# using dataframe
from pyspark.sql.functions import max
import pyspark.sql.functions as F

flight_df.orderBy(F.desc('count')).take(1)

In [0]:
# using sql
spark.sql("""

    select DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count
    from flight_table
    order by count DESC
    limit 1;


""").take(1)

In [0]:
from pyspark.sql.functions import desc

#  using dataframe
flight_df\
  .groupBy('DEST_COUNTRY_NAME')\
  .sum('count')\
  .withColumnRenamed('sum(count)','destination_total')\
  .sort(desc('destination_total'))\
  .limit(5)\
  .show()
        

In [0]:
# using sql

spark.sql("""
    
    select DEST_COUNTRY_NAME, sum(count) as DESTINATION_COUNT
    from flight_table
    group by DEST_COUNTRY_NAME
    order by DESTINATION_COUNT DESC
    limit 5;



""").show()