In [1]:
import findspark
findspark.init()

In [2]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, IntegerType, StringType, FloatType


In [3]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
df_airports_qa = spark.read.parquet("output/airports_qa.parquet")
df_airports_qa.printSchema()

df_planes_qa = spark.read.parquet('output/planes_qa.parquet')
df_planes_qa.printSchema()

df_flights_qa = spark.read.parquet('output/flights_qa.parquet')
df_flights_qa.printSchema()

root
 |-- faa: string (nullable = true)
 |-- qa_faa: string (nullable = true)
 |-- qa_name: string (nullable = true)
 |-- qa_lat: string (nullable = true)
 |-- qa_lon: string (nullable = true)
 |-- qa_tz: string (nullable = true)
 |-- qa_dst: string (nullable = true)
 |-- qa_alt: string (nullable = true)

root
 |-- tailnum: string (nullable = true)
 |-- qa_tailnum: string (nullable = true)
 |-- qa_year: string (nullable = true)
 |-- qa_type: string (nullable = true)
 |-- qa_manufacturer: string (nullable = true)
 |-- qa_model: string (nullable = true)
 |-- qa_engines: string (nullable = true)
 |-- qa_seats: string (nullable = true)
 |-- qa_speed: string (nullable = true)
 |-- qa_engine: string (nullable = true)

root
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- qa_year_month_day: string (nullable = true)
 |-- qa_hour_minute: string (nullable = true)
 |-- qa_dep_arr: string (nullable = true)
 |-- qa_dep_arr_delay: 

# Perguntas de Qualidade

## Pergunta 1

In [5]:
df = df_flights_qa.join(df_airports_qa,
                        df_flights_qa.origin == df_airports_qa.faa,
                       "left")
df_airports_qa_1 = (df_airports_qa
                    .withColumnRenamed("faa","faa_dest")
                    .withColumnRenamed("qa_faa","qa_faa_dest")
                   .withColumnRenamed("qa_name","qa_name_dest")
                   .withColumnRenamed("qa_lat","qa_lat_dest")
                   .withColumnRenamed("qa_lon","qa_lon_dest")
                   .withColumnRenamed("qa_tz", "qa_tz_dest")
                   .withColumnRenamed("qa_dst", "qa_dst_dest")
                   .withColumnRenamed("qa_alt","qa_alt_dest")) 

df = df.join(df_airports_qa_1,
            df.dest == df_airports_qa_1.faa_dest,
            "left")
df_planes_qa = df_planes_qa.withColumnRenamed("qa_tailnum","qa_tailnum_plane")
df = df.join(df_planes_qa,
            df.tailnum == df_planes_qa.tailnum,
            "left")

df.printSchema()

root
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- qa_year_month_day: string (nullable = true)
 |-- qa_hour_minute: string (nullable = true)
 |-- qa_dep_arr: string (nullable = true)
 |-- qa_dep_arr_delay: string (nullable = true)
 |-- qa_carrier: string (nullable = true)
 |-- qa_tailnum: string (nullable = true)
 |-- qa_flight: string (nullable = true)
 |-- qa_origin_dest: string (nullable = true)
 |-- qa_air_time: string (nullable = true)
 |-- qa_distance: string (nullable = true)
 |-- qa_distance_airtime: string (nullable = true)
 |-- faa: string (nullable = true)
 |-- qa_faa: string (nullable = true)
 |-- qa_name: string (nullable = true)
 |-- qa_lat: string (nullable = true)
 |-- qa_lon: string (nullable = true)
 |-- qa_tz: string (nullable = true)
 |-- qa_dst: string (nullable = true)
 |-- qa_alt: string (nullable = true)
 |-- faa_dest: string (nullable = true)
 |-- qa_faa_dest: string (nullable = true)
 |-- 

## Pergunta 2

In [6]:
df = df.drop('origin','dest','faa','faa_dest','tailnum')
colunas = tuple(df.columns)

data = [(coluna,) for coluna in colunas] 

df_results = spark.createDataFrame(data, ['nome_coluna'])

In [7]:
%%time
indicadores = {'M' : [], 'F' : [], 'S': [], 'T' : [], 'I' : [], 'C' : []}

for indicador in indicadores:
    result = (df.select([F.count(F.when(F.col(coluna).startswith(indicador), coluna)) for coluna in df.columns]).first())
    indicadores[indicador] = [item for item in result]

Wall time: 7.9 s


In [8]:
%%time
for indicador in indicadores:
    new_column = [[value,col] for value, col in zip(indicadores[indicador], df.columns)]
    new_df = spark.createDataFrame(new_column, [indicador,'nome'])
    df_results = df_results.join(new_df, df_results.nome_coluna == new_df.nome, "left")
    df_results = df_results.drop('nome')
    
df_results.show(50)

+-------------------+----+----+---+----+---+---+
|        nome_coluna|   M|   F|  S|   T|  I|  C|
+-------------------+----+----+---+----+---+---+
|        qa_air_time|  75|   0|  0|   0|  0|  0|
|             qa_alt|   0|   0|  0|   0|  0|  0|
|        qa_alt_dest|   0|   0|  0|   0|  0|  0|
|         qa_carrier|   0|   0|  0|   0|  0|  0|
|         qa_dep_arr|  55| 241|  0|   0|  0|  0|
|   qa_dep_arr_delay|  75|   0|  0|   0|  0|  0|
|        qa_distance|   0|   0|  0|   0|  0|  0|
|qa_distance_airtime|  75|   0|  0|9925|  0|  0|
|             qa_dst|   0|   0|  0|   0|  0|  0|
|        qa_dst_dest|   0|   0|  0|   0|  0|  0|
|          qa_engine|   0|   0|  0|   0|  0|  7|
|         qa_engines|   0|   0|  0|   0|  0|  0|
|             qa_faa|   0|   0|  0|   0|  0|  0|
|        qa_faa_dest|   0|   0|  0|   0|  0|  0|
|          qa_flight|   0|6158|  0|   0|  0|  0|
|     qa_hour_minute|  48|   0|  0|   0|  1|  0|
|             qa_lat|   0|   0|  0|   0|  0|  0|
|        qa_lat_dest

In [9]:
df_results.write.mode('overwrite').parquet('output/results.parquet')
df_results = spark.read.parquet('output/results.parquet')

## Pergunta 3

In [10]:
df_results.orderBy(F.col('M').desc()).show(1)

+-----------+----+---+---+---+---+---+
|nome_coluna|   M|  F|  S|  T|  I|  C|
+-----------+----+---+---+---+---+---+
|   qa_speed|9443|  0|  0|  0|  0|  0|
+-----------+----+---+---+---+---+---+
only showing top 1 row



## Pergunta 4

In [11]:
%%time
df_results.orderBy(F.col('F').desc()).show(1)

+-----------+---+----+---+---+---+---+
|nome_coluna|  M|   F|  S|  T|  I|  C|
+-----------+---+----+---+---+---+---+
|  qa_flight|  0|6158|  0|  0|  0|  0|
+-----------+---+----+---+---+---+---+
only showing top 1 row

Wall time: 83.8 ms


## Pergunta 5

In [12]:
df_results.orderBy(F.col('I').desc()).show(1)

+-----------+---+---+---+---+---+---+
|nome_coluna|  M|  F|  S|  T|  I|  C|
+-----------+---+---+---+---+---+---+
|    qa_year| 94|  0|  0|  0|  8|  0|
+-----------+---+---+---+---+---+---+
only showing top 1 row

