In [1]:
import h2o
import zipfile
import os
import sys
from pyspark.sql import SparkSession
from IPython.display import display
from pyspark.sql.functions import regexp_extract, col, split, udf, \
                                 trim, when, from_unixtime, unix_timestamp, minute, hour, datediff, lit, array,\
                                 to_date
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, BooleanType, ArrayType, StructType, StructField, LongType, TimestampType
import datetime
import argparse
import json
import glob, os, shutil
import pandas as pd
from pandas.io.json import json_normalize
from pyspark import SparkContext

pd.options.display.max_columns = 99

sc = SparkContext()

spark = SparkSession \
        .builder \
        .appName("Data ETL") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
        
display(spark.version)

'2.1.1'

# Load Data

In [13]:
# ! ls /home/ubuntu/s3/comb/txt_exception/ -l
# 826736 text files

In [15]:
# txt_exception_folder = '/home/ubuntu/s3/comb/txt_exception/'
# print(txt_exception_folder)

# # Version 1.1
# # flightv1_1 = spark.read.json(os.path.join(txt_exception_folder, "flight_15_13_price_2017-05-11*.txt"))
# flightv1_1 = spark.read.json(os.path.join(txt_exception_folder, "*.txt"))
# display(flightv1_1.count())
# display(flightv1_1.show(1))

/home/ubuntu/s3/comb/txt_exception/


148411

+------------+----------+--------------------+--------------------+--------+-----+----------+--------+------------------+-------+--------------------+--------------------+--------+----+--------------------+-------+
|currencyCode|   depDate|         flight_leg1|         flight_leg2|fromCity|price|searchDate|stayDays|         tableName|task_id|       timeline_leg1|       timeline_leg2|  toCity|trip|                 url|version|
+------------+----------+--------------------+--------------------+--------+-----+----------+--------+------------------+-------+--------------------+--------------------+--------+----+--------------------+-------+
|         AUD|2017-05-11|[[Hangzhou,HGH],2...|[[Bangkok,BKK],20...| Bangkok|420.1|2017-05-10|      14|flight_15_13_price|  16203|[[[Macau, Macau,M...|[[[Macau, Macau,M...|Hangzhou|   2|https://www.exped...|    1.1|
+------------+----------+--------------------+--------------------+--------+-----+----------+--------+------------------+-------+-----------

None

# Define UDF's

In [2]:
# for one way trips, display None in stay_days
def correct_stay_days(trip, stay_days):
    if trip == '1':
        return None
    else:
        return int(stay_days)

correct_stay_days_UDF = udf(correct_stay_days, IntegerType())

def correct_tickets_left(noOfTicketsLeft):
    if noOfTicketsLeft == 0:
        return 99
    else:
        return noOfTicketsLeft
    
correct_tickets_left_UDF = udf(correct_tickets_left, IntegerType())

timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"

take_all_level2_str = udf(lambda rows, a, b: None if rows is None else [None if row is None else row[a][b] for row in rows], ArrayType(StringType()))
take_all_level1_str = udf(lambda rows, a: None if rows is None else [None if row is None else row[a] for row in rows], ArrayType(StringType()))

# Modify version 1.0

In [37]:
# Version 1.0
flight = spark.read.parquet("/home/ubuntu/s3/comb/flight_v1_0.pq")
display(flight.count())
display(flight.show(2))


take_all_duration_UDF = udf(lambda rows: None if rows is None else [None if row is None else row.split(":", 1)[1].replace("h", "h:") for row in rows], ArrayType(StringType()))
# couldn't get it to work

flight2 = (flight.withColumn('stayDays', correct_stay_days_UDF(col('trip'), col('stay_days')))
                 .drop('stay_days')           
                 .withColumnRenamed('start_date', 'depDate')                 
                 .withColumn('depDate', to_date('depDate'))
                 .selectExpr('*', 'date_add(depDate, stayDays) as retDate')# this is when the return trip starts, might arrive a day later
                 .withColumnRenamed('from_city_name', 'fromCity')
                 .withColumnRenamed('to_city_name', 'toCity')                 
                 .withColumnRenamed('search_date', 'searchDate')                 
                 .withColumn('searchDate', to_date('searchDate'))
                 .withColumnRenamed('company', 'airlineName')                 
                 .withColumnRenamed('dep_time', 'departureTime')                                  
                 .withColumnRenamed('arr_time', 'arrivalTime')                                                   
                 .withColumn('duration_h', split(flight.duration,'h').getItem(0))
                 .withColumn('duration_m', F.substring_index(split(flight.duration,'h').getItem(1), 'm', 1))
#                  .withColumn('duration', F.struct(col('duration_h'), col('duration_m')))
                 .withColumn('duration_m', (col('duration_h')*60 + col('duration_m')))
                 .drop('duration', 'duration_h', 'flight_number')
                 .withColumnRenamed('price_code', 'currencyCode')                                  
                 .withColumnRenamed('stop', 'stops')
                 .withColumn('stops', col('stops').cast('byte')) 
                 .withColumn('stop_info', split(col('stop_info'), ';'))
#                  .withColumn('stop_duration', take_all_duration_UDF(col('stop_info')))
                 .withColumn('noOfTicketsLeft', correct_tickets_left_UDF('ticket_left'))
                 .withColumn('noOfTicketsLeft', col('noOfTicketsLeft').cast('byte')) 
                .drop('ticket_left')
               .withColumnRenamed('table_name', 'tableName')
                .select('price', 'version', 'searchDate', 'tableName', 'task_id', 'currencyCode', 
                        'fromCity', 'toCity', 'trip', 'depDate', 'retDate',
                        'stayDays', 
                       'departureTime', 'arrivalTime', 
                        'airlineName',  'duration_m', 
                        'flight_code', 'plane', 'stops', 'noOfTicketsLeft',
                       'airline_code', 'airline_codes',
                       'stop_info', 'span_days', 'power', 'video', 'wifi')                #'stop_duration', 
          )
# varaibles added in v1.1: 'departureTime_leg2', 'arrivalTime_leg2', 'airlineName_leg2','duration_m_leg2','stops_leg2'
#  'noOfTicketsLeft_leg2','airline_codes_leg2', 
# 'stop_list', 'url'

# variables dropped in v1.1:
# 'span_days', 'power', 'video', 'wifi', 'stop_info'

display(flight2.where(col('trip') == 1).show(1))
display(flight2.where(col('trip') == 2).show(1, truncate=False))
flight2.printSchema()

# flight2.select('flight_code', 'flight_number').distinct().show(1000)
# flight2.select('stop_info').distinct().show()
# flight2.select('stop_list').distinct().show(100, truncate=False)

2288103

+--------------+----------+---------+----------------+-------+------------+----+-------+------------+-------------+--------------------+-------------+--------------------+--------------------+--------+-----------+-------------+-----+--------------------+-----+------+----------+-----------+---------+----+-------------------+-----------+-----+----+
|from_city_name|start_date|stay_days|      table_name|task_id|to_city_name|trip|version|airline_code|airline_codes|            arr_time|check_bag_inc|             company|            dep_time|duration|flight_code|flight_number|index|               plane|power| price|price_code|search_date|span_days|stop|          stop_info|ticket_left|video|wifi|
+--------------+----------+---------+----------------+-------+------------+----+-------+------------+-------------+--------------------+-------------+--------------------+--------------------+--------+-----------+-------------+-----+--------------------+-----+------+----------+-----------+---------+--

None

+------+-------+----------+----------------+-------+------------+--------+-------+----+----------+-------+--------+--------------------+--------------------+--------------+----------+-----------+----------------+-----+---------------+------------+-------------+--------------------+---------+-----+-----+-----+
| price|version|searchDate|       tableName|task_id|currencyCode|fromCity| toCity|trip|   depDate|retDate|stayDays|       departureTime|         arrivalTime|   airlineName|duration_m|flight_code|           plane|stops|noOfTicketsLeft|airline_code|airline_codes|           stop_info|span_days|power|video| wifi|
+------+-------+----------+----------------+-------+------------+--------+-------+----+----------+-------+--------+--------------------+--------------------+--------------+----------+-----------+----------------+-----+---------------+------------+-------------+--------------------+---------+-----+-----+-----+
|605.72|    1.0|2017-05-01|flight_1_5_price|    676|         AUD|  

None

+-----+-------+----------+----------------+-------+------------+--------+-------+----+----------+----------+--------+-----------------------------+-----------------------------+--------------+----------+-----------+-----------------------------------+-----+---------------+------------+-------------+---------------------+---------+-----+-----+----+
|price|version|searchDate|tableName       |task_id|currencyCode|fromCity|toCity |trip|depDate   |retDate   |stayDays|departureTime                |arrivalTime                  |airlineName   |duration_m|flight_code|plane                              |stops|noOfTicketsLeft|airline_code|airline_codes|stop_info            |span_days|power|video|wifi|
+-----+-------+----------+----------------+-------+------------+--------+-------+----+----------+----------+--------+-----------------------------+-----------------------------+--------------+----------+-----------+-----------------------------------+-----+---------------+------------+-------------+

None

root
 |-- price: double (nullable = true)
 |-- version: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- tableName: string (nullable = true)
 |-- task_id: string (nullable = true)
 |-- currencyCode: string (nullable = true)
 |-- fromCity: string (nullable = true)
 |-- toCity: string (nullable = true)
 |-- trip: string (nullable = true)
 |-- depDate: date (nullable = true)
 |-- retDate: date (nullable = true)
 |-- stayDays: integer (nullable = true)
 |-- departureTime: string (nullable = true)
 |-- arrivalTime: string (nullable = true)
 |-- airlineName: string (nullable = true)
 |-- duration_m: double (nullable = true)
 |-- flight_code: string (nullable = true)
 |-- plane: string (nullable = true)
 |-- stops: byte (nullable = true)
 |-- noOfTicketsLeft: byte (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- airline_codes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_info: array (nullable = true)
 |    |-- element

In [42]:
# flight2.repartition(1).write.mode('append').parquet(os.path.join("/home/ubuntu/s3/pq_v1_1/", "flight_v1_1a"))
flight2.repartition(1).write.parquet(os.path.join("/home/ubuntu/s3/pq_v1_1/", "flight_v1_0a"))

In [43]:
spark.read.parquet(pq_folder + "flight_v1_0a").limit(2).toPandas()

Unnamed: 0,price,version,searchDate,tableName,task_id,currencyCode,fromCity,toCity,trip,depDate,retDate,stayDays,departureTime,arrivalTime,airlineName,duration_m,flight_code,plane,stops,noOfTicketsLeft,airline_code,airline_codes,stop_info,span_days,power,video,wifi
0,0.0,1.0,2017-05-08,flight_1_5_price,620,,sydney,beijing,2,2017-09-09,2017-10-07,28,2017-09-09T11:15:00.000+10:00,2017-09-10T04:10:00.000+08:00,Qantas Airways,1135.0,QF145,BOEING 737-800 (WINGLETS) PASSENGER,1,99,QF,"[QF, CA]",[Auckland(AKL):2h35m],0,,,
1,472.14,1.0,2017-05-08,flight_1_5_price,620,AUD,sydney,beijing,2,2017-09-09,2017-10-07,28,2017-09-09T20:50:00.000+10:00,2017-09-10T17:25:00.000+08:00,China Eastern Airlines,1355.0,MU778,AIRBUS INDUSTRIE A330-200,1,4,MU,"[MU, MU]",[Kunming(KMG):8h30m],0,,,


In [4]:
# flight2.select('stop_info').distinct().show(100, truncate=False)

In [5]:
# # flight2.sample(False, 0.001, 42).toPandas()

# flight2.limit(10).toPandas()


# Modify Version 1.1

In [6]:

# # take_all_level1_str = udf(lambda rows, a: [row[a] for row in rows], ArrayType(StringType()))
# take_all_level2_str = udf(lambda rows, a, b:  [None if row is None else row[a][b] for row in rows], ArrayType(StringType()))
# # take_all = udf(lambda rows, a: [row[a]['city'] for row in rows], ArrayType(StringType()))


# flightv1_1.withColumn("city", take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalAirport'), lit('city')))\
#                       .select('airports').show(10)
# # flightv1_1.withColumn("airports", take_all_level1_str(flightv1_1.timeline_leg1, 'type')).select('airports').show(10)
# # flightv1_1.withColumn("airports", take_all(flightv1_1.timeline_leg1, lit('arrivalAirport'))).select('airports').show(10)

# # display(flightv1_1.select('timeline_leg1').show(100, truncate=False))



In [7]:
# df.selectExpr("explode(check) as e").select("e.*").show()

# flightv1_1.selectExpr('explode(timeline_leg1) as e').select('e.*').show(truncate=False)

### Need to split stop list into duration and make it compatible with v1.0 - Can't figure out how to do this for v1.0

# Main function to convert text files to parquet

In [8]:
def txtToPq(inputFolder, pqFolder, pqFileName, searchString = "*.txt", append = True):
    """
    Read in all txt files in a folder, convert to parquet, and either append parquet or create new parquet
    @params:
        inputFolder   - Required  : input folder that contains json line txt files (Str)        
        pqFolder      - Required  : folder to save the parquet files into (Str)        
        pqFileName    - Required  : parquet file name (Bool)        
        append        - Optional  : append to existing parquet or create new parquet 
        searchString  - Optional  : search string that identifies all the json line text files (Str)        
    """
    
    flightv1_1 = spark.read.json(os.path.join(inputFolder, searchString))
    
    flightv1_1_2 = (flightv1_1.withColumn('trip', col('trip').cast('string'))
                        .withColumn('stayDays', correct_stay_days_UDF(col('trip'), col('stayDays')))                    
                        .withColumn('depDate', to_date('depDate'))
                        .withColumn('searchDate', to_date('searchDate'))
                        .selectExpr('*', 'date_add(depDate, stayDays) as retDate')# this is when the return trip starts, might arrive a day later
                        .withColumn('airline_code', flightv1_1.flight_leg1.carrierSummary.airlineCodes.getItem(0))                   
                        .withColumn('airline_codes', flightv1_1.flight_leg1.carrierSummary.airlineCodes)                    
                        .withColumn('airline_codes_leg2', flightv1_1.flight_leg2.carrierSummary.airlineCodes)                    
                        .withColumn('departureTime', flightv1_1.flight_leg1.departureTime)
                        .withColumn('departureTime_leg2', flightv1_1.flight_leg2.departureTime)
                        .withColumn('arrivalTime', flightv1_1.flight_leg1.arrivalTime)
                        .withColumn('arrivalTime_leg2', flightv1_1.flight_leg2.arrivalTime)
    #                 .withColumn('check_bag_inc', flightv1_1.flight_leg1.arrivalTime)
                        .withColumn('airlineName', flightv1_1.flight_leg1.carrierSummary.airlineName)
                        .withColumn('airlineName_leg2', flightv1_1.flight_leg2.carrierSummary.airlineName)
                        .withColumn('duration_m', (F.unix_timestamp('arrivalTime', format=timeFmt) - 
                                                   F.unix_timestamp('departureTime', format=timeFmt))/60)                    
                    .withColumn('duration_m_leg2', (F.unix_timestamp('arrivalTime_leg2', format=timeFmt) - 
                                                   F.unix_timestamp('departureTime_leg2', format=timeFmt))/60)                    
    #                     .withColumn('duration', flightv1_1.timeline_leg1.getItem(1).duration)
                    .withColumn('airlineCode', flightv1_1.timeline_leg1.getItem(0).carrier.airlineCode)
                    .withColumn('flightNumber', flightv1_1.timeline_leg1.getItem(0).carrier.flightNumber.cast('string'))                
                    .select('*', F.concat(col('airlineCode'), col('flightNumber')).alias('flight_code'))
                    .drop('airlineCode', 'flightNumber')
                    .withColumn('plane', flightv1_1.timeline_leg1.getItem(0).carrier.plane)                
                    .withColumn('stops', flightv1_1.flight_leg1.stops.cast('byte'))                                
                    .withColumn('stops_leg2', flightv1_1.flight_leg2.stops.cast('byte'))                

    #                 .withColumn('stop_list', flightv1_1.flight_leg1.stop_list)# need to do more work                
                    .withColumn('stop_airport', take_all_level1_str(flightv1_1.flight_leg1.stop_list, lit('airport')))                                               
                    .withColumn('stop_duration', take_all_level1_str(flightv1_1.flight_leg1.stop_list, lit('duration')))                                               

    #                 .withColumn('stop_list_leg2', flightv1_1.flight_leg2.stop_list)               
                    .withColumn('stop_airport_leg2', take_all_level1_str(flightv1_1.flight_leg2.stop_list, lit('airport')))                                               
                    .withColumn('stop_duration_leg2', take_all_level1_str(flightv1_1.flight_leg2.stop_list, lit('duration')))                                               



                    .withColumn('noOfTicketsLeft', correct_tickets_left_UDF(flightv1_1.flight_leg1.carrierSummary.noOfTicketsLeft))
                    .withColumn('noOfTicketsLeft', col('noOfTicketsLeft').cast('byte'))                
                    .withColumn('noOfTicketsLeft_leg2', correct_tickets_left_UDF(flightv1_1.flight_leg2.carrierSummary.noOfTicketsLeft))
                    .withColumn('noOfTicketsLeft_leg2', col('noOfTicketsLeft_leg2').cast('byte'))
                    .withColumn('fromCityAirportCode', flightv1_1.flight_leg1.departureLocation.airportCode)                
                    .withColumn('toCityAirportCode', flightv1_1.flight_leg1.arrivalLocation.airportCode)
                    .withColumn('fromCityAirportCode_leg2', flightv1_1.flight_leg2.departureLocation.airportCode)
                    .withColumn('toCityAirportCode_leg2', flightv1_1.flight_leg2.arrivalLocation.airportCode)

                    # carrier leg 1
                    .withColumn('carrierAirProviderId', flightv1_1.flight_leg1.carrierSummary.airProviderId)
                    .withColumn('carrierAirlineImageFileName', flightv1_1.flight_leg1.carrierSummary.airlineImageFileName)
                    .withColumn('carrierMixedCabinClass', flightv1_1.flight_leg1.carrierSummary.mixedCabinClass)
                    .withColumn('carrierMultiStop', flightv1_1.flight_leg1.carrierSummary.multiStop)
                    .withColumn('carrierNextDayArrival', flightv1_1.flight_leg1.carrierSummary.nextDayArrival)

                    # carrier leg 2
                    .withColumn('carrierAirProviderId_leg2', flightv1_1.flight_leg2.carrierSummary.airProviderId)
                    .withColumn('carrierAirlineImageFileName_leg2', flightv1_1.flight_leg2.carrierSummary.airlineImageFileName)
                    .withColumn('carrierMixedCabinClass_leg2', flightv1_1.flight_leg2.carrierSummary.mixedCabinClass)
                    .withColumn('carrierMultiStop_leg2', flightv1_1.flight_leg2.carrierSummary.multiStop)
                    .withColumn('carrierNextDayArrival_leg2', flightv1_1.flight_leg2.carrierSummary.nextDayArrival)

                    ### Leg 1
                    ## Leg 1 departure
    #                 .withColumn('timeline_departureAirport', take_all_airport(flightv1_1.timeline_leg1, lit('departureAirport')))                               
                    .withColumn('timeline_departureAirport_cityState', take_all_level2_str(flightv1_1.timeline_leg1, lit('departureAirport'), lit('airportCityState')))
                    .withColumn('timeline_departureAirport_city', take_all_level2_str(flightv1_1.timeline_leg1, lit('departureAirport'), lit('city')))
                    .withColumn('timeline_departureAirport_code', take_all_level2_str(flightv1_1.timeline_leg1, lit('departureAirport'), lit('code')))
                    .withColumn('timeline_departureAirport_localName', take_all_level2_str(flightv1_1.timeline_leg1, lit('departureAirport'), lit('localName')))
                    .withColumn('timeline_departureAirport_longName', take_all_level2_str(flightv1_1.timeline_leg1, lit('departureAirport'), lit('longName')))
                    .withColumn('timeline_departureAirport_name', take_all_level2_str(flightv1_1.timeline_leg1, lit('departureAirport'), lit('name')))

                    .withColumn('timeline_departureTime', take_all_level2_str(flightv1_1.timeline_leg1, lit('departureTime'), lit('isoStr')))



                    ## Leg 1 arrival
                    .withColumn('timeline_arrivalAirport_cityState', take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalAirport'), lit('airportCityState')))
                    .withColumn('timeline_arrivalAirport_city', take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalAirport'), lit('city')))
                    .withColumn('timeline_arrivalAirport_code', take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalAirport'), lit('code')))
                    .withColumn('timeline_arrivalAirport_localName', take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalAirport'), lit('localName')))
                    .withColumn('timeline_arrivalAirport_longName', take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalAirport'), lit('longName')))
                    .withColumn('timeline_arrivalAirport_name', take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalAirport'), lit('name')))                

                    .withColumn('timeline_arrivalTime', take_all_level2_str(flightv1_1.timeline_leg1, lit('arrivalTime'), lit('isoStr')))

                    # distance
                    .withColumn('timeline_distance', take_all_level2_str(flightv1_1.timeline_leg1, lit('distance'), lit('formattedTotal')))

                    # carrier
                    .withColumn('timeline_plane', take_all_level2_str(flightv1_1.timeline_leg1, lit('carrier'), lit('plane')))

                    # brandedFareName
                    .withColumn('timeline_brandedFareName', take_all_level1_str(flightv1_1.timeline_leg1, lit('brandedFareName')))                               

                    # type
                    .withColumn('timeline_type', take_all_level1_str(flightv1_1.timeline_leg1, lit('type')))                               

                    ### Leg 2
                    ## Leg 2 departure
                    .withColumn('timeline_departureAirport_cityState_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('departureAirport'), lit('airportCityState')))
                    .withColumn('timeline_departureAirport_city_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('departureAirport'), lit('city')))
                    .withColumn('timeline_departureAirport_code_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('departureAirport'), lit('code')))
                    .withColumn('timeline_departureAirport_localName_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('departureAirport'), lit('localName')))
                    .withColumn('timeline_departureAirport_longName_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('departureAirport'), lit('longName')))
                    .withColumn('timeline_departureAirport_name_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('departureAirport'), lit('name')))

                    .withColumn('timeline_departureTime_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('departureTime'), lit('isoStr')))                


                    ## Leg 2 arrival
                    .withColumn('timeline_arrivalAirport_cityState_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('arrivalAirport'), lit('airportCityState')))
                    .withColumn('timeline_arrivalAirport_city_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('arrivalAirport'), lit('city')))
                    .withColumn('timeline_arrivalAirport_code_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('arrivalAirport'), lit('code')))
                    .withColumn('timeline_arrivalAirport_localName_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('arrivalAirport'), lit('localName')))
                    .withColumn('timeline_arrivalAirport_longName_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('arrivalAirport'), lit('longName')))
                    .withColumn('timeline_arrivalAirport_name_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('arrivalAirport'), lit('name')))                

                    .withColumn('timeline_arrivalTime_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('arrivalTime'), lit('isoStr')))

                    # distance
                    .withColumn('timeline_distance_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('distance'), lit('formattedTotal')))

                    # carrier
                    .withColumn('timeline_plane_leg2', take_all_level2_str(flightv1_1.timeline_leg2, lit('carrier'), lit('plane')))

                    # brandedFareName
                    .withColumn('timeline_brandedFareName_leg2', take_all_level1_str(flightv1_1.timeline_leg2, lit('brandedFareName')))                           

                    # type
                    .withColumn('timeline_type_leg2', take_all_level1_str(flightv1_1.timeline_leg2, lit('type')))                               



                    .select('price', 'version', 'searchDate', 'tableName', 'task_id', 'currencyCode', 
                            'fromCity', 'toCity', 'trip', 'depDate', 'retDate',
                            'stayDays', 
                           'departureTime', 'arrivalTime', 'departureTime_leg2', 'arrivalTime_leg2',
                            'airlineName', 'airlineName_leg2', 'duration_m', 'duration_m_leg2',                
                            'flight_code', 'plane', 'stops', 'stops_leg2', 'stop_airport', 'stop_duration', 'stop_airport_leg2', 'stop_duration_leg2',
                            'noOfTicketsLeft', 'noOfTicketsLeft_leg2',
                           'airline_code', 'airline_codes', 'airline_codes_leg2', 
                            'url', 'fromCityAirportCode', 'toCityAirportCode', 'fromCityAirportCode_leg2', 'toCityAirportCode_leg2',
                           'carrierAirProviderId', 'carrierAirlineImageFileName', 'carrierMixedCabinClass', 'carrierMultiStop', 'carrierNextDayArrival',
                            'carrierAirProviderId_leg2', 'carrierAirlineImageFileName_leg2', 'carrierMixedCabinClass_leg2', 'carrierMultiStop_leg2', 'carrierNextDayArrival_leg2',

                            ## leg 1
                            # departure
                            'timeline_departureAirport_cityState', 'timeline_departureAirport_city', 'timeline_departureAirport_code', 'timeline_departureAirport_localName', 
                            'timeline_departureAirport_longName', 'timeline_departureAirport_name',

                            'timeline_departureTime',

                            # arrival
                            'timeline_arrivalAirport_cityState', 'timeline_arrivalAirport_city', 'timeline_arrivalAirport_code', 'timeline_arrivalAirport_localName', 
                            'timeline_arrivalAirport_longName', 'timeline_arrivalAirport_name',

                            'timeline_arrivalTime',

                            'timeline_distance',
                            'timeline_plane',
                            'timeline_brandedFareName',
                            'timeline_type',

                            ## leg 2                        
                            # departure
                            'timeline_departureAirport_cityState_leg2', 'timeline_departureAirport_city_leg2', 'timeline_departureAirport_code_leg2', 'timeline_departureAirport_localName_leg2', 
                            'timeline_departureAirport_longName_leg2', 'timeline_departureAirport_name_leg2',

                            'timeline_departureTime_leg2',

                            # arrival
                            'timeline_arrivalAirport_cityState_leg2', 'timeline_arrivalAirport_city_leg2', 'timeline_arrivalAirport_code_leg2', 'timeline_arrivalAirport_localName_leg2', 
                            'timeline_arrivalAirport_longName_leg2', 'timeline_arrivalAirport_name_leg2',

                            'timeline_arrivalTime_leg2',

                            'timeline_distance_leg2',
                            'timeline_plane_leg2',
                            'timeline_brandedFareName_leg2',
                            'timeline_type_leg2'
                           )                
                   )

    if append:
        flightv1_1_2.repartition(1).write.mode('append').parquet(os.path.join(pqFolder, pqFileName))        
    else:
        flightv1_1_2.repartition(1).write.parquet(os.path.join(pqFolder, pqFileName))       

In [25]:
# leg1 is renamed to leg1
def txtToPq_v2(inputFolder, pqFolder, pqFileName, searchString = "*.txt", append = True):
    """
    Read in all txt files in a folder, convert to parquet, and either append parquet or create new parquet
    @params:
        inputFolder   - Required  : input folder that contains json line txt files (Str)        
        pqFolder      - Required  : folder to save the parquet files into (Str)        
        pqFileName    - Required  : parquet file name (Bool)        
        append        - Optional  : append to existing parquet or create new parquet 
        searchString  - Optional  : search string that identifies all the json line text files (Str)        
    """
    
    flightv1_1 = spark.read.json(os.path.join(inputFolder, searchString))
    
    flightv1_1_2 = (flightv1_1.withColumn('trip', col('trip').cast('string'))
                        .withColumn('stayDays', correct_stay_days_UDF(col('trip'), col('stayDays')))                    
                        .withColumn('depDate', to_date('depDate'))
                        .withColumn('searchDate', to_date('searchDate'))
                        .selectExpr('*', 'date_add(depDate, stayDays) as retDate')# this is when the return trip starts, might arrive a day later
                        .withColumn('airline_code', flightv1_1.leg1.carrierSummary.airlineCodes.getItem(0))                   
                        .withColumn('airline_codes', flightv1_1.leg1.carrierSummary.airlineCodes)                    
                        .withColumn('airline_codes_leg2', flightv1_1.leg2.carrierSummary.airlineCodes)                    
                        .withColumn('departureTime', flightv1_1.leg1.departureTime.isoStr)
                        .withColumn('departureTime_leg2', flightv1_1.leg2.departureTime.isoStr)
                        .withColumn('arrivalTime', flightv1_1.leg1.arrivalTime.isoStr)
                        .withColumn('arrivalTime_leg2', flightv1_1.leg2.arrivalTime.isoStr)
    #                 .withColumn('check_bag_inc', flightv1_1.leg1.arrivalTime)
                        .withColumn('airlineName', flightv1_1.leg1.carrierSummary.airlineName)
                        .withColumn('airlineName_leg2', flightv1_1.leg2.carrierSummary.airlineName)
                        .withColumn('duration_m', (F.unix_timestamp('arrivalTime', format=timeFmt) - 
                                                   F.unix_timestamp('departureTime', format=timeFmt))/60)                    
                    .withColumn('duration_m_leg2', (F.unix_timestamp('arrivalTime_leg2', format=timeFmt) - 
                                                   F.unix_timestamp('departureTime_leg2', format=timeFmt))/60)                    
    #                     .withColumn('duration', flightv1_1.timeline1.getItem(1).duration)
                    .withColumn('airlineCode', flightv1_1.timeline1.getItem(0).carrier.airlineCode)
                    .withColumn('flightNumber', flightv1_1.timeline1.getItem(0).carrier.flightNumber.cast('string'))                
                    .select('*', F.concat(col('airlineCode'), col('flightNumber')).alias('flight_code'))
                    .drop('airlineCode', 'flightNumber')
                    .withColumn('plane', flightv1_1.timeline1.getItem(0).carrier.plane)                
                    .withColumn('stops', flightv1_1.leg1.stops.cast('byte'))                                
                    .withColumn('stops_leg2', flightv1_1.leg2.stops.cast('byte'))                

    #                 .withColumn('stop_list', flightv1_1.leg1.stop_list)# need to do more work                
                    .withColumn('stop_airport', take_all_level1_str(flightv1_1.leg1.stop_list, lit('airport')))                                               
                    .withColumn('stop_duration', take_all_level1_str(flightv1_1.leg1.stop_list, lit('duration')))                                               

    #                 .withColumn('stop_list_leg2', flightv1_1.leg2.stop_list)               
                    .withColumn('stop_airport_leg2', take_all_level1_str(flightv1_1.leg2.stop_list, lit('airport')))                                               
                    .withColumn('stop_duration_leg2', take_all_level1_str(flightv1_1.leg2.stop_list, lit('duration')))                                               



                    .withColumn('noOfTicketsLeft', correct_tickets_left_UDF(flightv1_1.leg1.carrierSummary.noOfTicketsLeft))
                    .withColumn('noOfTicketsLeft', col('noOfTicketsLeft').cast('byte'))                
                    .withColumn('noOfTicketsLeft_leg2', correct_tickets_left_UDF(flightv1_1.leg2.carrierSummary.noOfTicketsLeft))
                    .withColumn('noOfTicketsLeft_leg2', col('noOfTicketsLeft_leg2').cast('byte'))
                    .withColumn('fromCityAirportCode', flightv1_1.leg1.departureLocation.airportCode)                
                    .withColumn('toCityAirportCode', flightv1_1.leg1.arrivalLocation.airportCode)
                    .withColumn('fromCityAirportCode_leg2', flightv1_1.leg2.departureLocation.airportCode)
                    .withColumn('toCityAirportCode_leg2', flightv1_1.leg2.arrivalLocation.airportCode)

                    # carrier leg 1
                    .withColumn('carrierAirProviderId', flightv1_1.leg1.carrierSummary.airProviderId)
                    .withColumn('carrierAirlineImageFileName', flightv1_1.leg1.carrierSummary.airlineImageFileName)
                    .withColumn('carrierMixedCabinClass', flightv1_1.leg1.carrierSummary.mixedCabinClass)
                    .withColumn('carrierMultiStop', flightv1_1.leg1.carrierSummary.multiStop)
                    .withColumn('carrierNextDayArrival', flightv1_1.leg1.carrierSummary.nextDayArrival)

                    # carrier leg 2
                    .withColumn('carrierAirProviderId_leg2', flightv1_1.leg2.carrierSummary.airProviderId)
                    .withColumn('carrierAirlineImageFileName_leg2', flightv1_1.leg2.carrierSummary.airlineImageFileName)
                    .withColumn('carrierMixedCabinClass_leg2', flightv1_1.leg2.carrierSummary.mixedCabinClass)
                    .withColumn('carrierMultiStop_leg2', flightv1_1.leg2.carrierSummary.multiStop)
                    .withColumn('carrierNextDayArrival_leg2', flightv1_1.leg2.carrierSummary.nextDayArrival)

                    ### Leg 1
                    ## Leg 1 departure
    #                 .withColumn('timeline_departureAirport', take_all_airport(flightv1_1.timeline1, lit('departureAirport')))                               
                    .withColumn('timeline_departureAirport_cityState', take_all_level2_str(flightv1_1.timeline1, lit('departureAirport'), lit('airportCityState')))
                    .withColumn('timeline_departureAirport_city', take_all_level2_str(flightv1_1.timeline1, lit('departureAirport'), lit('city')))
                    .withColumn('timeline_departureAirport_code', take_all_level2_str(flightv1_1.timeline1, lit('departureAirport'), lit('code')))
                    .withColumn('timeline_departureAirport_localName', take_all_level2_str(flightv1_1.timeline1, lit('departureAirport'), lit('localName')))
                    .withColumn('timeline_departureAirport_longName', take_all_level2_str(flightv1_1.timeline1, lit('departureAirport'), lit('longName')))
                    .withColumn('timeline_departureAirport_name', take_all_level2_str(flightv1_1.timeline1, lit('departureAirport'), lit('name')))

                    .withColumn('timeline_departureTime', take_all_level2_str(flightv1_1.timeline1, lit('departureTime'), lit('isoStr')))



                    ## Leg 1 arrival
                    .withColumn('timeline_arrivalAirport_cityState', take_all_level2_str(flightv1_1.timeline1, lit('arrivalAirport'), lit('airportCityState')))
                    .withColumn('timeline_arrivalAirport_city', take_all_level2_str(flightv1_1.timeline1, lit('arrivalAirport'), lit('city')))
                    .withColumn('timeline_arrivalAirport_code', take_all_level2_str(flightv1_1.timeline1, lit('arrivalAirport'), lit('code')))
                    .withColumn('timeline_arrivalAirport_localName', take_all_level2_str(flightv1_1.timeline1, lit('arrivalAirport'), lit('localName')))
                    .withColumn('timeline_arrivalAirport_longName', take_all_level2_str(flightv1_1.timeline1, lit('arrivalAirport'), lit('longName')))
                    .withColumn('timeline_arrivalAirport_name', take_all_level2_str(flightv1_1.timeline1, lit('arrivalAirport'), lit('name')))                

                    .withColumn('timeline_arrivalTime', take_all_level2_str(flightv1_1.timeline1, lit('arrivalTime'), lit('isoStr')))

                    # distance
                    .withColumn('timeline_distance', take_all_level2_str(flightv1_1.timeline1, lit('distance'), lit('formattedTotal')))

                    # carrier
                    .withColumn('timeline_plane', take_all_level2_str(flightv1_1.timeline1, lit('carrier'), lit('plane')))

                    # brandedFareName
                    .withColumn('timeline_brandedFareName', take_all_level1_str(flightv1_1.timeline1, lit('brandedFareName')))                               

                    # type
                    .withColumn('timeline_type', take_all_level1_str(flightv1_1.timeline1, lit('type')))                               

                    ### Leg 2
                    ## Leg 2 departure
                    .withColumn('timeline_departureAirport_cityState_leg2', take_all_level2_str(flightv1_1.timeline2, lit('departureAirport'), lit('airportCityState')))
                    .withColumn('timeline_departureAirport_city_leg2', take_all_level2_str(flightv1_1.timeline2, lit('departureAirport'), lit('city')))
                    .withColumn('timeline_departureAirport_code_leg2', take_all_level2_str(flightv1_1.timeline2, lit('departureAirport'), lit('code')))
                    .withColumn('timeline_departureAirport_localName_leg2', take_all_level2_str(flightv1_1.timeline2, lit('departureAirport'), lit('localName')))
                    .withColumn('timeline_departureAirport_longName_leg2', take_all_level2_str(flightv1_1.timeline2, lit('departureAirport'), lit('longName')))
                    .withColumn('timeline_departureAirport_name_leg2', take_all_level2_str(flightv1_1.timeline2, lit('departureAirport'), lit('name')))

                    .withColumn('timeline_departureTime_leg2', take_all_level2_str(flightv1_1.timeline2, lit('departureTime'), lit('isoStr')))                


                    ## Leg 2 arrival
                    .withColumn('timeline_arrivalAirport_cityState_leg2', take_all_level2_str(flightv1_1.timeline2, lit('arrivalAirport'), lit('airportCityState')))
                    .withColumn('timeline_arrivalAirport_city_leg2', take_all_level2_str(flightv1_1.timeline2, lit('arrivalAirport'), lit('city')))
                    .withColumn('timeline_arrivalAirport_code_leg2', take_all_level2_str(flightv1_1.timeline2, lit('arrivalAirport'), lit('code')))
                    .withColumn('timeline_arrivalAirport_localName_leg2', take_all_level2_str(flightv1_1.timeline2, lit('arrivalAirport'), lit('localName')))
                    .withColumn('timeline_arrivalAirport_longName_leg2', take_all_level2_str(flightv1_1.timeline2, lit('arrivalAirport'), lit('longName')))
                    .withColumn('timeline_arrivalAirport_name_leg2', take_all_level2_str(flightv1_1.timeline2, lit('arrivalAirport'), lit('name')))                

                    .withColumn('timeline_arrivalTime_leg2', take_all_level2_str(flightv1_1.timeline2, lit('arrivalTime'), lit('isoStr')))

                    # distance
                    .withColumn('timeline_distance_leg2', take_all_level2_str(flightv1_1.timeline2, lit('distance'), lit('formattedTotal')))

                    # carrier
                    .withColumn('timeline_plane_leg2', take_all_level2_str(flightv1_1.timeline2, lit('carrier'), lit('plane')))

                    # brandedFareName
                    .withColumn('timeline_brandedFareName_leg2', take_all_level1_str(flightv1_1.timeline2, lit('brandedFareName')))                           

                    # type
                    .withColumn('timeline_type_leg2', take_all_level1_str(flightv1_1.timeline2, lit('type')))                               



                    .select('price', 'version', 'searchDate', 'tableName', 'task_id', 'currencyCode', 
                            'fromCity', 'toCity', 'trip', 'depDate', 'retDate',
                            'stayDays', 
                           'departureTime', 'arrivalTime', 'departureTime_leg2', 'arrivalTime_leg2',
                            'airlineName', 'airlineName_leg2', 'duration_m', 'duration_m_leg2',                
                            'flight_code', 'plane', 'stops', 'stops_leg2', 'stop_airport', 'stop_duration', 'stop_airport_leg2', 'stop_duration_leg2',
                            'noOfTicketsLeft', 'noOfTicketsLeft_leg2',
                           'airline_code', 'airline_codes', 'airline_codes_leg2', 
                            'fromCityAirportCode', 'toCityAirportCode', 'fromCityAirportCode_leg2', 'toCityAirportCode_leg2',
                           'carrierAirProviderId', 'carrierAirlineImageFileName', 'carrierMixedCabinClass', 'carrierMultiStop', 'carrierNextDayArrival',
                            'carrierAirProviderId_leg2', 'carrierAirlineImageFileName_leg2', 'carrierMixedCabinClass_leg2', 'carrierMultiStop_leg2', 'carrierNextDayArrival_leg2',
                            #'url',

                            ## leg 1
                            # departure
                            'timeline_departureAirport_cityState', 'timeline_departureAirport_city', 'timeline_departureAirport_code', 'timeline_departureAirport_localName', 
                            'timeline_departureAirport_longName', 'timeline_departureAirport_name',

                            'timeline_departureTime',

                            # arrival
                            'timeline_arrivalAirport_cityState', 'timeline_arrivalAirport_city', 'timeline_arrivalAirport_code', 'timeline_arrivalAirport_localName', 
                            'timeline_arrivalAirport_longName', 'timeline_arrivalAirport_name',

                            'timeline_arrivalTime',

                            'timeline_distance',
                            'timeline_plane',
                            'timeline_brandedFareName',
                            'timeline_type',

                            ## leg 2                        
                            # departure
                            'timeline_departureAirport_cityState_leg2', 'timeline_departureAirport_city_leg2', 'timeline_departureAirport_code_leg2', 'timeline_departureAirport_localName_leg2', 
                            'timeline_departureAirport_longName_leg2', 'timeline_departureAirport_name_leg2',

                            'timeline_departureTime_leg2',

                            # arrival
                            'timeline_arrivalAirport_cityState_leg2', 'timeline_arrivalAirport_city_leg2', 'timeline_arrivalAirport_code_leg2', 'timeline_arrivalAirport_localName_leg2', 
                            'timeline_arrivalAirport_longName_leg2', 'timeline_arrivalAirport_name_leg2',

                            'timeline_arrivalTime_leg2',

                            'timeline_distance_leg2',
                            'timeline_plane_leg2',
                            'timeline_brandedFareName_leg2',
                            'timeline_type_leg2'
                           )                
                   )

    if append:
        flightv1_1_2.repartition(1).write.mode('append').parquet(os.path.join(pqFolder, pqFileName))        
    else:
        flightv1_1_2.repartition(1).write.parquet(os.path.join(pqFolder, pqFileName))   

In [12]:
zip_folder = '/home/ubuntu/s3/zip/'
txt_folder = '/home/ubuntu/s3/txt/'
txt_exception_folder = '/home/ubuntu/s3/comb/txt_exception/'
pq_folder = '/home/ubuntu/s3/pq_v1_1/'
txt_new_exception_folder = '/home/ubuntu/s3/txt_exception/'

# Only create folder if they don't exist
! mkdir -p $zip_folder
! mkdir -p $txt_folder
! mkdir -p $txt_exception_folder
! mkdir -p $pq_folder
! mkdir -p $txt_new_exception_folder

# Tidy up working folder
! rm -rf /home/ubuntu/s3/comb/zip/*
! rm -rf /home/ubuntu/s3/comb/txt/*
! rm -rf /home/ubuntu/s3/zip/*
! rm -rf /home/ubuntu/s3/txt/*
# -f - stands for "force" which is helpful when you don't want to be asked/prompted if you want to remove an archive, for example.
# -r - stands for "recursive" which means that you want to go recursively down every folder and remove everything.


In [13]:
# download another v1.1 file, unzip and test
! cd $zip_folder
! aws s3 cp s3://flight.price.11/flight_10_1/flight_10_1_price_2017-05-15.zip flight_10_1_price_2017-05-15.zip
! sudo apt-get install unzip
! unzip flight_10_1_price_2017-05-15.zip -d $txt_folder

download: s3://flight.price.11/flight_10_1/flight_10_1_price_2017-05-15.zip to ./flight_10_1_price_2017-05-15.zip
Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following packages were automatically installed and are no longer required:
  linux-aws-headers-4.4.0-1013 linux-aws-headers-4.4.0-1016
  linux-aws-headers-4.4.0-1017 linux-headers-4.4.0-1013-aws
  linux-headers-4.4.0-1016-aws linux-headers-4.4.0-1017-aws
  linux-image-4.4.0-1013-aws linux-image-4.4.0-1016-aws
  linux-image-4.4.0-1017-aws
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  zip
The following NEW packages will be installed:
  unzip
0 upgraded, 1 newly installed, 0 to remove and 47 not upgraded.
Need to get 158 kB of archives.
After this operation, 530 kB of additional disk space will be used.
Get:1 http://ap-southeast-2.ec2.archive.ubuntu.com/ubuntu xenial/main amd64 unzip amd64 6.0-20ubuntu1 [158 kB]
Fetched 158 kB in 0s (11.3 MB/s)
Selecting previ

In [23]:
flightv1_1 = spark.read.json(os.path.join('/home/ubuntu/s3/txt/final_results/', "*.txt"))
flightv1_1.printSchema()
flightv1_1.limit(10).toPandas()

root
 |-- currencyCode: string (nullable = true)
 |-- depDate: string (nullable = true)
 |-- fromCity: string (nullable = true)
 |-- leg1: struct (nullable = true)
 |    |-- arrivalLocation: struct (nullable = true)
 |    |    |-- airportCity: string (nullable = true)
 |    |    |-- airportCode: string (nullable = true)
 |    |-- arrivalTime: struct (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- dateLongStr: string (nullable = true)
 |    |    |-- dateTime: long (nullable = true)
 |    |    |-- hour: string (nullable = true)
 |    |    |-- isoStr: string (nullable = true)
 |    |    |-- time: string (nullable = true)
 |    |    |-- travelDate: string (nullable = true)
 |    |-- carrierSummary: struct (nullable = true)
 |    |    |-- airProviderId: long (nullable = true)
 |    |    |-- airlineCodes: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- airlineImageFileName: string (nullable = true)
 |    |    |--

Unnamed: 0,currencyCode,depDate,fromCity,leg1,leg2,price,searchDate,stayDays,tableName,task_id,timeline1,timeline2,toCity,trip,version
0,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (25/6/2017, Sun., 25 Jun., 14...",848.54,2017-05-15,21,flight_10_1_price,94,"[((Guangzhou, China, Guangzhou, CAN, Baiyun In...","[((Guangzhou, China, Guangzhou, CAN, Baiyun In...",sydney,2,1.1
1,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (25/6/2017, Sun., 25 Jun., 14...",848.54,2017-05-15,21,flight_10_1_price,94,"[((Guangzhou, China, Guangzhou, CAN, Baiyun In...","[((Guangzhou, China, Guangzhou, CAN, Baiyun In...",sydney,2,1.1
2,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (24/6/2017, Sat., 24 Jun., 14...",848.54,2017-05-15,21,flight_10_1_price,94,"[((Guangzhou, China, Guangzhou, CAN, Baiyun In...","[((Guangzhou, China, Guangzhou, CAN, Baiyun In...",sydney,2,1.1
3,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (24/6/2017, Sat., 24 Jun., 14...",848.54,2017-05-15,21,flight_10_1_price,94,"[((Guangzhou, China, Guangzhou, CAN, Baiyun In...","[((Guangzhou, China, Guangzhou, CAN, Baiyun In...",sydney,2,1.1
4,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (25/6/2017, Sun., 25 Jun., 14...",994.74,2017-05-15,21,flight_10_1_price,94,"[((Xiamen, China, Xiamen, XMN, Xiamen Intl., X...","[((Xiamen, China, Xiamen, XMN, Xiamen Intl., X...",sydney,2,1.1
5,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (25/6/2017, Sun., 25 Jun., 14...",994.74,2017-05-15,21,flight_10_1_price,94,"[((Xiamen, China, Xiamen, XMN, Xiamen Intl., X...","[((Xiamen, China, Xiamen, XMN, Xiamen Intl., X...",sydney,2,1.1
6,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (25/6/2017, Sun., 25 Jun., 14...",994.74,2017-05-15,21,flight_10_1_price,94,"[((Xiamen, China, Xiamen, XMN, Xiamen Intl., X...","[((Xiamen, China, Xiamen, XMN, Xiamen Intl., X...",sydney,2,1.1
7,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (25/6/2017, Sun., 25 Jun., 14...",1004.95,2017-05-15,21,flight_10_1_price,94,"[((Kuala Lumpur, Malaysia, Kuala Lumpur, KUL, ...","[((Kuala Lumpur, Malaysia, Kuala Lumpur, KUL, ...",sydney,2,1.1
8,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (25/6/2017, Sun., 25 Jun., 14...",1054.6,2017-05-15,21,flight_10_1_price,94,"[((Kuala Lumpur, Malaysia, Kuala Lumpur, KUL, ...","[((Kuala Lumpur, Malaysia, Kuala Lumpur, KUL, ...",sydney,2,1.1
9,AUD,2017-06-03,Chengdu,"((Sydney, SYD), (4/6/2017, Sun., 4 Jun., 14965...","((Chengdu, CTU), (24/6/2017, Sat., 24 Jun., 14...",1090.54,2017-05-15,21,flight_10_1_price,94,"[((Guangzhou, China, Guangzhou, CAN, Baiyun In...","[((Guangzhou, China, Guangzhou, CAN, Baiyun In...",sydney,2,1.1


In [19]:
flightv1_1.select("leg1.arrivalTime").show(truncate=False)

+------------------------------------------------------------------------------------+
|arrivalTime                                                                         |
+------------------------------------------------------------------------------------+
|[4/6/2017,Sun., 4 Jun.,1496569200000,null,2017-06-04T19:40:00.000+10:00,7:40pm,null]|
|[4/6/2017,Sun., 4 Jun.,1496569200000,null,2017-06-04T19:40:00.000+10:00,7:40pm,null]|
|[4/6/2017,Sun., 4 Jun.,1496569200000,null,2017-06-04T19:40:00.000+10:00,7:40pm,null]|
|[4/6/2017,Sun., 4 Jun.,1496569200000,null,2017-06-04T19:40:00.000+10:00,7:40pm,null]|
|[4/6/2017,Sun., 4 Jun.,1496531700000,null,2017-06-04T09:15:00.000+10:00,9:15am,null]|
|[4/6/2017,Sun., 4 Jun.,1496531700000,null,2017-06-04T09:15:00.000+10:00,9:15am,null]|
|[4/6/2017,Sun., 4 Jun.,1496531700000,null,2017-06-04T09:15:00.000+10:00,9:15am,null]|
|[4/6/2017,Sun., 4 Jun.,1496571000000,null,2017-06-04T20:10:00.000+10:00,8:10pm,null]|
|[4/6/2017,Sun., 4 Jun.,1496533800000,null,

In [44]:
# txtToPq_v2(inputFolder = '/home/ubuntu/s3/txt/final_results/', pqFolder = pq_folder,
#                     pqFileName = "flight_v1_1", searchString = "*.txt", append = False)    
txtToPq_v2(inputFolder = '/home/ubuntu/s3/txt/final_results/', pqFolder = pq_folder,
                    pqFileName = "flight_v1_0a", searchString = "*.txt", append = True)    

spark.read.parquet(pq_folder + "flight_v1_0a").limit(2).toPandas()

Py4JJavaError: An error occurred while calling o3569.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 77.0 failed 1 times, most recent failure: Lost task 0.0 in stage 77.0 (TID 26661, localhost, executor driver): org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/ubuntu/s3/pq_v1_1/flight_v1_0a/part-00000-e6c206b0-608c-46da-8b5a-b70edcefc103.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
	at org.apache.spark.sql.catalyst.expressions.MutableLong.update(SpecificInternalRow.scala:149)
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:240)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:159)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/ubuntu/s3/pq_v1_1/flight_v1_0a/part-00000-e6c206b0-608c-46da-8b5a-b70edcefc103.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
	at org.apache.spark.sql.catalyst.expressions.MutableLong.update(SpecificInternalRow.scala:149)
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:240)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:159)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
	... 21 more


In [30]:
spark.read.parquet(pq_folder + "flight_v1_1a").where(col('fromCity')=="Chengdu").limit(2).toPandas()

Unnamed: 0,price,version,searchDate,tableName,task_id,currencyCode,fromCity,toCity,trip,depDate,retDate,stayDays,departureTime,arrivalTime,departureTime_leg2,arrivalTime_leg2,airlineName,airlineName_leg2,duration_m,duration_m_leg2,flight_code,plane,stops,stops_leg2,stop_airport,stop_duration,stop_airport_leg2,stop_duration_leg2,noOfTicketsLeft,noOfTicketsLeft_leg2,airline_code,airline_codes,airline_codes_leg2,url,fromCityAirportCode,toCityAirportCode,fromCityAirportCode_leg2,toCityAirportCode_leg2,carrierAirProviderId,carrierAirlineImageFileName,carrierMixedCabinClass,carrierMultiStop,carrierNextDayArrival,carrierAirProviderId_leg2,carrierAirlineImageFileName_leg2,carrierMixedCabinClass_leg2,carrierMultiStop_leg2,carrierNextDayArrival_leg2,timeline_departureAirport_cityState,timeline_departureAirport_city,timeline_departureAirport_code,timeline_departureAirport_localName,timeline_departureAirport_longName,timeline_departureAirport_name,timeline_departureTime,timeline_arrivalAirport_cityState,timeline_arrivalAirport_city,timeline_arrivalAirport_code,timeline_arrivalAirport_localName,timeline_arrivalAirport_longName,timeline_arrivalAirport_name,timeline_arrivalTime,timeline_distance,timeline_plane,timeline_brandedFareName,timeline_type,timeline_departureAirport_cityState_leg2,timeline_departureAirport_city_leg2,timeline_departureAirport_code_leg2,timeline_departureAirport_localName_leg2,timeline_departureAirport_longName_leg2,timeline_departureAirport_name_leg2,timeline_departureTime_leg2,timeline_arrivalAirport_cityState_leg2,timeline_arrivalAirport_city_leg2,timeline_arrivalAirport_code_leg2,timeline_arrivalAirport_localName_leg2,timeline_arrivalAirport_longName_leg2,timeline_arrivalAirport_name_leg2,timeline_arrivalTime_leg2,timeline_distance_leg2,timeline_plane_leg2,timeline_brandedFareName_leg2,timeline_type_leg2
0,848.54,1.1,2017-05-15,flight_10_1_price,94,AUD,Chengdu,sydney,2,2017-06-03,2017-06-24,21,2017-06-03T22:20:00.000+08:00,2017-06-04T19:40:00.000+10:00,2017-06-24T10:45:00.000+10:00,2017-06-25T10:30:00.000+08:00,China Southern Airlines,China Southern Airlines,1280.0,1425.0,CZ3428,Boeing 737-800,1,1,"[Guangzhou, China (CAN-Baiyun Intl.)]",[7h:35m],"[Guangzhou, China (CAN-Baiyun Intl.)]",[13h:30m],2,2,CZ,"[CZ, CZ]","[CZ, CZ]",,CTU,SYD,SYD,CTU,7,CZ.gif,False,True,True,7,CZ.gif,False,True,True,"[Chengdu, China, Guangzhou, China]","[Chengdu, Guangzhou]","[CTU, CAN]","[Shuangliu Intl., Baiyun Intl.]","[Chengdu, China (CTU-Shuangliu Intl.), Guangzh...","[Chengdu (CTU), Guangzhou (CAN)]","[2017-06-03T22:20:00.000+08:00, 2017-06-04T08:...","[Guangzhou, China, Sydney, NSW]","[Guangzhou, Sydney]","[CAN, SYD]","[Baiyun Intl., Kingsford Smith Intl.]","[Guangzhou, China (CAN-Baiyun Intl.), Sydney, ...","[Guangzhou (CAN), Sydney (Kingsford Smith Intl.)]","[2017-06-04T00:50:00.000+08:00, 2017-06-04T19:...","[0, 0]","[Boeing 737-800, Airbus A330]","[, ]","[Segment, Segment]","[Sydney, NSW, Guangzhou, China]","[Sydney, Guangzhou]","[SYD, CAN]","[Kingsford Smith Intl., Baiyun Intl.]","[Sydney, NSW (SYD-Kingsford Smith Intl.), Guan...","[Sydney (Kingsford Smith Intl.), Guangzhou (CAN)]","[2017-06-24T10:45:00.000+10:00, 2017-06-25T08:...","[Guangzhou, China, Chengdu, China]","[Guangzhou, Chengdu]","[CAN, CTU]","[Baiyun Intl., Shuangliu Intl.]","[Guangzhou, China (CAN-Baiyun Intl.), Chengdu,...","[Guangzhou (CAN), Chengdu (CTU)]","[2017-06-24T18:30:00.000+08:00, 2017-06-25T10:...","[0, 0]","[Airbus A330, Airbus A330]","[, ]","[Segment, Segment]"
1,848.54,1.1,2017-05-15,flight_10_1_price,94,AUD,Chengdu,sydney,2,2017-06-03,2017-06-24,21,2017-06-03T21:25:00.000+08:00,2017-06-04T19:40:00.000+10:00,2017-06-24T10:45:00.000+10:00,2017-06-25T09:30:00.000+08:00,China Southern Airlines,China Southern Airlines,1335.0,1365.0,CZ3484,AIRBUS INDUSTRIE A330-300,1,1,"[Guangzhou, China (CAN-Baiyun Intl.)]",[8h:45m],"[Guangzhou, China (CAN-Baiyun Intl.)]",[12h:35m],4,4,CZ,"[CZ, CZ]","[CZ, CZ]",,CTU,SYD,SYD,CTU,7,CZ.gif,False,True,True,7,CZ.gif,False,True,True,"[Chengdu, China, Guangzhou, China]","[Chengdu, Guangzhou]","[CTU, CAN]","[Shuangliu Intl., Baiyun Intl.]","[Chengdu, China (CTU-Shuangliu Intl.), Guangzh...","[Chengdu (CTU), Guangzhou (CAN)]","[2017-06-03T21:25:00.000+08:00, 2017-06-04T08:...","[Guangzhou, China, Sydney, NSW]","[Guangzhou, Sydney]","[CAN, SYD]","[Baiyun Intl., Kingsford Smith Intl.]","[Guangzhou, China (CAN-Baiyun Intl.), Sydney, ...","[Guangzhou (CAN), Sydney (Kingsford Smith Intl.)]","[2017-06-03T23:40:00.000+08:00, 2017-06-04T19:...","[0, 0]","[AIRBUS INDUSTRIE A330-300, Airbus A330]","[, ]","[Segment, Segment]","[Sydney, NSW, Guangzhou, China]","[Sydney, Guangzhou]","[SYD, CAN]","[Kingsford Smith Intl., Baiyun Intl.]","[Sydney, NSW (SYD-Kingsford Smith Intl.), Guan...","[Sydney (Kingsford Smith Intl.), Guangzhou (CAN)]","[2017-06-24T10:45:00.000+10:00, 2017-06-25T07:...","[Guangzhou, China, Chengdu, China]","[Guangzhou, Chengdu]","[CAN, CTU]","[Baiyun Intl., Shuangliu Intl.]","[Guangzhou, China (CAN-Baiyun Intl.), Chengdu,...","[Guangzhou (CAN), Chengdu (CTU)]","[2017-06-24T18:30:00.000+08:00, 2017-06-25T09:...","[0, 0]","[Airbus A330, Boeing 737-800]","[, ]","[Segment, Segment]"


In [41]:
spark.read.parquet(pq_folder + "flight_v1_1a").limit(2).toPandas()

Py4JJavaError: An error occurred while calling o2965.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 66.0 failed 1 times, most recent failure: Lost task 0.0 in stage 66.0 (TID 23919, localhost, executor driver): org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/ubuntu/s3/pq_v1_1/flight_v1_1a/part-00000-9d6b9c4b-0f6b-4802-9ed6-20afecb04c96.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
	at org.apache.spark.sql.catalyst.expressions.MutableLong.update(SpecificInternalRow.scala:149)
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:240)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:159)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/ubuntu/s3/pq_v1_1/flight_v1_1a/part-00000-9d6b9c4b-0f6b-4802-9ed6-20afecb04c96.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
	at org.apache.spark.sql.catalyst.expressions.MutableLong.update(SpecificInternalRow.scala:149)
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:240)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:159)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
	... 21 more


In [40]:
spark.read.parquet(pq_folder + "flight_v1_1a").where(col('toCity')=="beijing").limit(2).toPandas()

Py4JJavaError: An error occurred while calling o2921.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 64.0 failed 1 times, most recent failure: Lost task 0.0 in stage 64.0 (TID 23917, localhost, executor driver): org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/ubuntu/s3/pq_v1_1/flight_v1_1a/part-00000-9d6b9c4b-0f6b-4802-9ed6-20afecb04c96.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
	at org.apache.spark.sql.catalyst.expressions.MutableLong.update(SpecificInternalRow.scala:149)
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:240)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:159)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/ubuntu/s3/pq_v1_1/flight_v1_1a/part-00000-9d6b9c4b-0f6b-4802-9ed6-20afecb04c96.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
	at org.apache.spark.sql.catalyst.expressions.MutableLong.update(SpecificInternalRow.scala:149)
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:240)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:159)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
	... 21 more


# Create the first parquet file

In [16]:
txt_exception_folder = '/home/ubuntu/s3/comb/txt_exception/'

txtToPq(inputFolder = txt_exception_folder, pqFolder = pq_folder,
                    pqFileName = "flight_v1_1a", searchString = "*.txt", append = False)    

spark.read.parquet(pq_folder + "flight_v1_1a").limit(2).toPandas()

Unnamed: 0,price,version,searchDate,tableName,task_id,currencyCode,fromCity,toCity,trip,depDate,retDate,stayDays,departureTime,arrivalTime,departureTime_leg2,arrivalTime_leg2,airlineName,airlineName_leg2,duration_m,duration_m_leg2,flight_code,plane,stops,stops_leg2,stop_airport,stop_duration,stop_airport_leg2,stop_duration_leg2,noOfTicketsLeft,noOfTicketsLeft_leg2,airline_code,airline_codes,airline_codes_leg2,url,fromCityAirportCode,toCityAirportCode,fromCityAirportCode_leg2,toCityAirportCode_leg2,carrierAirProviderId,carrierAirlineImageFileName,carrierMixedCabinClass,carrierMultiStop,carrierNextDayArrival,carrierAirProviderId_leg2,carrierAirlineImageFileName_leg2,carrierMixedCabinClass_leg2,carrierMultiStop_leg2,carrierNextDayArrival_leg2,timeline_departureAirport_cityState,timeline_departureAirport_city,timeline_departureAirport_code,timeline_departureAirport_localName,timeline_departureAirport_longName,timeline_departureAirport_name,timeline_departureTime,timeline_arrivalAirport_cityState,timeline_arrivalAirport_city,timeline_arrivalAirport_code,timeline_arrivalAirport_localName,timeline_arrivalAirport_longName,timeline_arrivalAirport_name,timeline_arrivalTime,timeline_distance,timeline_plane,timeline_brandedFareName,timeline_type,timeline_departureAirport_cityState_leg2,timeline_departureAirport_city_leg2,timeline_departureAirport_code_leg2,timeline_departureAirport_localName_leg2,timeline_departureAirport_longName_leg2,timeline_departureAirport_name_leg2,timeline_departureTime_leg2,timeline_arrivalAirport_cityState_leg2,timeline_arrivalAirport_city_leg2,timeline_arrivalAirport_code_leg2,timeline_arrivalAirport_localName_leg2,timeline_arrivalAirport_longName_leg2,timeline_arrivalAirport_name_leg2,timeline_arrivalTime_leg2,timeline_distance_leg2,timeline_plane_leg2,timeline_brandedFareName_leg2,timeline_type_leg2
0,420.1,1.1,2017-05-10,flight_15_13_price,16203,AUD,Bangkok,Hangzhou,2,2017-05-11,2017-05-25,14,2017-05-11T22:00:00.000+07:00,2017-05-12T14:25:00.000+08:00,2017-05-25T15:20:00.000+08:00,2017-05-25T21:00:00.000+07:00,Air Macau Company,Air Macau Company,985.0,340.0,NX881,Airbus A321,1,1,"[Macau, Macau (MFM-Macau Intl.)]",[10h:25m],"[Macau, Macau (MFM-Macau Intl.)]",[1h:20m],99,99,NX,"[NX, NX]","[NX, NX]",https://www.expedia.com.au/Flights-Search?mode...,BKK,HGH,HGH,BKK,7,NX.gif,False,True,True,7,NX.gif,False,True,False,"[Bangkok, Thailand, Macau, Macau]","[Bangkok, Macau]","[BKK, MFM]","[Suvarnabhumi Intl., Macau Intl.]","[Bangkok, Thailand (BKK-Suvarnabhumi Intl.), M...","[Bangkok (Suvarnabhumi Intl.), Macau (MFM)]","[2017-05-11T22:00:00.000+07:00, 2017-05-12T12:...","[Macau, Macau, Hangzhou, China]","[Macau, Hangzhou]","[MFM, HGH]","[Macau Intl., Xiaoshan Intl.]","[Macau, Macau (MFM-Macau Intl.), Hangzhou, Chi...","[Macau (MFM), Hangzhou (HGH)]","[2017-05-12T01:45:00.000+08:00, 2017-05-12T14:...","[0, 0]","[Airbus A321, Airbus A321]","[, ]","[Segment, Segment]","[Hangzhou, China, Macau, Macau]","[Hangzhou, Macau]","[HGH, MFM]","[Xiaoshan Intl., Macau Intl.]","[Hangzhou, China (HGH-Xiaoshan Intl.), Macau, ...","[Hangzhou (HGH), Macau (MFM)]","[2017-05-25T15:20:00.000+08:00, 2017-05-25T19:...","[Macau, Macau, Bangkok, Thailand]","[Macau, Bangkok]","[MFM, BKK]","[Macau Intl., Suvarnabhumi Intl.]","[Macau, Macau (MFM-Macau Intl.), Bangkok, Thai...","[Macau (MFM), Bangkok (Suvarnabhumi Intl.)]","[2017-05-25T17:40:00.000+08:00, 2017-05-25T21:...","[0, 0]","[Airbus A321, Airbus A321]","[, ]","[Segment, Segment]"
1,431.9,1.1,2017-05-10,flight_15_13_price,16203,AUD,Bangkok,Hangzhou,2,2017-05-11,2017-05-25,14,2017-05-11T13:35:00.000+07:00,2017-05-11T22:40:00.000+08:00,2017-05-25T08:05:00.000+08:00,2017-05-25T14:45:00.000+07:00,Xiamen Airlines,Xiamen Airlines,545.0,400.0,MF854,Boeing 737-800,1,1,"[Xiamen, China (XMN-Xiamen Intl.)]",[3h:25m],"[Xiamen, China (XMN-Xiamen Intl.)]",[2h:55m],99,99,MF,"[MF, MF]","[MF, MF]",https://www.expedia.com.au/Flights-Search?mode...,BKK,HGH,HGH,BKK,7,MF.gif,False,True,False,7,MF.gif,False,True,False,"[Bangkok, Thailand, Xiamen, China]","[Bangkok, Xiamen]","[BKK, XMN]","[Suvarnabhumi Intl., Xiamen Intl.]","[Bangkok, Thailand (BKK-Suvarnabhumi Intl.), X...","[Bangkok (Suvarnabhumi Intl.), Xiamen (XMN)]","[2017-05-11T13:35:00.000+07:00, 2017-05-11T21:...","[Xiamen, China, Hangzhou, China]","[Xiamen, Hangzhou]","[XMN, HGH]","[Xiamen Intl., Xiaoshan Intl.]","[Xiamen, China (XMN-Xiamen Intl.), Hangzhou, C...","[Xiamen (XMN), Hangzhou (HGH)]","[2017-05-11T17:45:00.000+08:00, 2017-05-11T22:...","[0, 0]","[Boeing 737-800, Boeing 737-800]","[, ]","[Segment, Segment]","[Hangzhou, China, Xiamen, China]","[Hangzhou, Xiamen]","[HGH, XMN]","[Xiaoshan Intl., Xiamen Intl.]","[Hangzhou, China (HGH-Xiaoshan Intl.), Xiamen,...","[Hangzhou (HGH), Xiamen (XMN)]","[2017-05-25T08:05:00.000+08:00, 2017-05-25T12:...","[Xiamen, China, Bangkok, Thailand]","[Xiamen, Bangkok]","[XMN, BKK]","[Xiamen Intl., Suvarnabhumi Intl.]","[Xiamen, China (XMN-Xiamen Intl.), Bangkok, Th...","[Xiamen (XMN), Bangkok (Suvarnabhumi Intl.)]","[2017-05-25T09:35:00.000+08:00, 2017-05-25T14:...","[0, 0]","[Boeing 737-800, Boeing 737-800]","[, ]","[Segment, Segment]"


In [32]:
from os.path import join
from os import listdir, rmdir
from shutil import move


def unzip_files(dir_in, dir_out, extension):
    os.chdir(dir_in) # change directory from working dir to dir with files
    for subdir, dirs, files in os.walk(dir_in):
        for item in files:
            if item.endswith(extension): # check for ".zip" extension
                file_name = os.path.join(subdir, item)
                zip_ref = zipfile.ZipFile(file_name) # create zipfile object
                zip_ref.extractall(dir_out) # extract file to dir
                zip_ref.close() # close file             

                
def clear_folder(folder):
    for the_file in os.listdir(folder):
        file_path = os.path.join(folder, the_file)
        try:
            if os.path.isfile(file_path):
                os.unlink(file_path)
            #elif os.path.isdir(file_path): shutil.rmtree(file_path)
        except Exception as e:
            print(e)

        # recreate the folder after deletion
        if not os.path.exists(folder):
            os.makedirs(folder)

            
# Print iterations progress
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█'):
    """
    Call in a loop to create terminal progress bar
    @params:
        iteration   - Required  : current iteration (Int)
        total       - Required  : total iterations (Int)
        prefix      - Optional  : prefix string (Str)
        suffix      - Optional  : suffix string (Str)
        decimals    - Optional  : positive number of decimals in percent complete (Int)
        length      - Optional  : character length of bar (Int)
        fill        - Optional  : bar fill character (Str)
    """
    percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
    filledLength = int(length * iteration // total)
    bar = fill * filledLength + '-' * (length - filledLength)
#     print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix))        
    print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix))
    # Print New Line on Complete
    if iteration == total: 
        print()

In [17]:
import boto3 
s3_client = boto3.client("s3")
# all_objects = s3_client.list_objects(Bucket = 'flight.price.11')

s3 = boto3.resource('s3')
bucket = s3.Bucket('flight.price.11')

In [None]:
# get number of items in the s3 bucket
! aws s3 ls s3://flight.price.11/ --recursive | wc -l    
    

### Get list of all zip files

In [52]:
from time import sleep
from math import floor

# make a list
i = 0
l = 11147 # from the above command

s3_files = list()

# Initial call to print 0% progress
printProgressBar(i, l, prefix = 'Progress:', suffix = 'Complete', decimals = 1, length = 50)

for item in bucket.objects.all():    
    # define s3 file name
    s3_file = item.key    
    s3_files.append(s3_file)
    
    sleep(0.1)
    # Update Progress Bar
    i += 1
    if i % floor(l / 200) == 0:
        printProgressBar(i, l, prefix = 'Progress:', suffix = 'Complete', decimals = 1, length = 50)    

Progress: |--------------------------------------------------| 0.0% Complete
Progress: |--------------------------------------------------| 0.5% Complete
Progress: |--------------------------------------------------| 1.0% Complete
Progress: |--------------------------------------------------| 1.5% Complete
Progress: |--------------------------------------------------| 2.0% Complete
Progress: |█-------------------------------------------------| 2.5% Complete
Progress: |█-------------------------------------------------| 3.0% Complete
Progress: |█-------------------------------------------------| 3.5% Complete
Progress: |█-------------------------------------------------| 3.9% Complete
Progress: |██------------------------------------------------| 4.4% Complete
Progress: |██------------------------------------------------| 4.9% Complete
Progress: |██------------------------------------------------| 5.4% Complete
Progress: |██------------------------------------------------| 5.9% Complete

In [59]:
import csv

with open('/home/ubuntu/work/flight/zip_files.csv', 'w') as myfile:
    wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
    wr.writerow(s3_files)
    

In [64]:
import csv
with open('/home/ubuntu/work/flight/zip_files.csv', 'r') as f:
    reader = csv.reader(f)
    your_list = list(reader)
your_list

[['flight_10_1/flight_10_1_price_2017-05-15.zip',
  'flight_10_1/flight_10_1_price_2017-05-16.zip',
  'flight_10_1/flight_10_1_price_2017-05-17.zip',
  'flight_10_1/flight_10_1_price_2017-05-18.zip',
  'flight_10_1/flight_10_1_price_2017-05-19.zip',
  'flight_10_1/flight_10_1_price_2017-05-20.zip',
  'flight_10_1/flight_10_1_price_2017-05-21.zip',
  'flight_10_1/flight_10_1_price_2017-05-22.zip',
  'flight_10_1/flight_10_1_price_2017-05-23.zip',
  'flight_10_1/flight_10_1_price_2017-05-24.zip',
  'flight_10_1/flight_10_1_price_2017-05-25.zip',
  'flight_10_1/flight_10_1_price_2017-05-26.zip',
  'flight_10_1/flight_10_1_price_2017-05-27.zip',
  'flight_10_1/flight_10_1_price_2017-05-28.zip',
  'flight_10_1/flight_10_1_price_2017-05-29.zip',
  'flight_10_1/flight_10_1_price_2017-05-30.zip',
  'flight_10_1/flight_10_1_price_2017-05-31.zip',
  'flight_10_1/flight_10_1_price_2017-06-01.zip',
  'flight_10_1/flight_10_1_price_2017-06-02.zip',
  'flight_10_1/flight_10_1_price_2017-06-03.zip',


In [None]:
for item in bucket.objects.all():
    
    # define s3 file name
    s3_file = item.key    
    
    # clear working folder
    clear_folder(zip_folder)
    clear_folder(txt_folder)   
   
     # download zip
    s3_client.download_file('flight.price.11', s3_file, zip_folder + s3_file.replace('/', '__'))
    
    # extract to txt
    unzip_files(zip_folder, txt_folder, '.zip')       
    
    # if necessary move subfolder contents to parent folder        
    try:
        for filename in listdir(join(txt_folder, 'final_results')):
            move(join(txt_folder, 'final_results', filename), join(txt_folder, filename))
        rmdir(join(txt_folder, 'final_results'))
    except Exception as e:
        print(e)

    # convert to parquet and append to existing parquet
    try:
        txtToPq(inputFolder = txt_folder, pqFolder = pq_folder,
                        pqFileName = "flight_v1_1", searchString = "*.txt", append = True)
    except:
        for filename in listdir(txt_folder):
            move(join(txt_folder, filename), join(txt_new_exception_folder, filename))

    sleep(0.1)
    # Update Progress Bar
    i += 1
    printProgressBar(i, l, prefix = 'Progress:', suffix = 'Complete', decimals = 2, length = 50)    