In [1]:
import logging
from sys import stdout

formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler(stdout)
console_handler.setFormatter(formatter)

logger = logging.getLogger('opensky.spark_consumer')
logger.addHandler(console_handler)
logger.setLevel('INFO')


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import pandas as pd
from pyspark.sql import types as T
from pyspark.sql import functions as F
import json
import os


# # Topics/Brokers
topic_real_time_states = "real-time-states"
topic_sparse_states = 'sparse_states'
topic_enriched_real_time_states = 'enriched-real-rime-states'
broker = "localhost:9092"

host_name = 'cnt7-naya-cdh6'
hive_host = "localhost"
hdfs_host = "localhost"
hdfs_port = 8020

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("StructuredRealTimeState") \
    .getOrCreate()


schema = T.ArrayType(T.StructType()\
                     .add("time", T.TimestampType())\
                     .add("icao24", T.StringType())\
                     .add("callsign", T.StringType())\
                     .add("last_contact", T.TimestampType())\
                     .add("longitude", T.FloatType())\
                     .add("latitude", T.FloatType())\
                     .add("baro_altitude", T.FloatType())\
                     .add("on_ground", T.IntegerType())\
                     .add("velocity", T.FloatType())\
                     .add("geo_altitude", T.FloatType())\
                     .add("squawk", T.StringType())\
                     .add("position_source", T.IntegerType()))

In [5]:
import pyarrow as pa
import re
from datetime import datetime, timedelta
import urllib
from impala.dbapi import connect

def drop_old_partitions(impala_conn: connect, table_name:str, table_src_path:str, 
                             partition_name:str, earliest_time_to_keep:datetime.timestamp):
    """
    Drop old partitions at path <table_src_path>, drop all partitions older than <earliest_time_to_keep>
    First drop partitions through impala client (impala.dbapi), then delete folders
    """
    logger.debug(f'dropping old partitions for table {table_name}, all partitions oldert than {earliest_time_to_keep}')
    
    fs = pa.hdfs.connect(
            host=hdfs_host, 
            port=hdfs_port, 
            user='hdfs', 
            kerb_ticket=None, 
            driver='libhdfs', 
            extra_conf=None)


    partition_pattern = r'(.+\=(.+))'
    partitions_paths = fs.ls(table_src_path)
    partitions_tup = [re.findall(partition_pattern, partition_path) for partition_path in partitions_paths]
    # partitions_tup is of form : (partition path, partition date)
    # urllib.parse.unquote in order to turn special chars as %3A to regular (:) , aslo flatten the list
    partitions_tup = [(pt[0][0], urllib.parse.unquote(pt[0][1])) for pt in partitions_tup if len(pt) > 0]
    partitions_dict = {date : path for path, date in partitions_tup}

    partitions_to_delete = [p_d for p_d in partitions_dict.keys() if datetime.strptime(p_d, '%Y-%m-%d %H:%M:%S') < earliest_time_to_keep]

    # here partitions to_delete holds all partitions that should be deleted

    crsr = impala_conn.cursor()

    try:
        for part_key in partitions_to_delete:
            crsr.execute(f'alter table {table_name} drop if exists partition ({partition_name}="{part_key}");')
            fs.delete(partitions_dict[part_key], recursive=True)
            logger.debug(f'deleted : {partitions_dict[part_key]}')
    except Exception as ex:
        logger.Error(ex)
    finally:
        crsr.close()


import pyarrow as pa
from datetime import datetime, timedelta
from impala.dbapi import connect
from os import path


def write_to_tables(df: DataFrame, epoch_id):
    last_hour_table_name = 'last_hour'
    target_database = 'opensky_network'
    
    root_data_path = '/user/naya/FinalProject/'
    last_hour_path = path.join(root_data_path, 'last_hour')
    last_day_path = path.join(root_data_path, 'last_day')
    last_week_path = path.join(root_data_path,'last_week')
    
    logger.info(f'write_to_hive: epoch_id: {epoch_id} len: {df.count()}')
    # write to each table - minutes, hours, days
    if df.count() != 0:
        df.persist()
        logger.debug(f'trying to write to : {last_hour_path}')
        df.withColumn('date_minute', F.date_trunc('minute', df.time))\
                    .write\
                    .mode("append")\
                    .partitionBy('date_minute')\
                    .parquet(f'hdfs://localhost:8020/{last_hour_path}')
        logger.debug(f'Trying to write to : {last_day_path}')
        df.withColumn('date_hour', F.date_trunc('hour', df.time))\
                    .write\
                    .mode("append")\
                    .partitionBy('date_hour')\
                    .parquet(f'hdfs://localhost:8020/{last_day_path}')
        df.unpersist()

        impala_conn = connect(host=host_name, database = target_database, user = 'naya', password = 'naya', auth_mechanism = 'NOSASL')

        drop_old_partitions(impala_conn, 'states_last_hour', last_hour_path, 
                                 'date_minute', datetime.now() - timedelta(hours=1))
        drop_old_partitions(impala_conn, 'states_last_day', last_day_path, 
                                 'date_hour', datetime.now() - timedelta(hours=24))

        impala_crsr = impala_conn.cursor()
        try:
            for table_name in ['states_last_hour', 'states_last_day']:
                impala_crsr.execute(f'alter table {table_name} recover partitions;')
                impala_crsr.execute(f'refresh {table_name};')
        except Exception as ex:
            logger.error(ex)
        finally:
            impala_crsr.close()
            
#         crsr.execute(f'show partitions {table_name};')
#         logger.debug([d for d, *rest in crsr.fetchall()])
    
#     impala_client.table(last_hour_table_name).drop_partition('date_minute=2019-12-28 12:13:00')
#     impala_client.table(last_hour_table_name).alter()
#     impala_client.table(last_hour_table_name).refresh()



In [6]:
import geopandas as gpd
import matplotlib
import numpy as np
import pandas as pd
from multiprocessing import  Pool
from shapely.geometry.point import Point as ShapelyPoint
from pyspark.sql.functions import udf
import time

# Add country to each row
# Load countries file
COUNTRIES_GDF = gpd.read_file('./Material/ne_10m_admin_0_countries.shp')

def get_country(lon:float, lat:float) -> str:
    if lon is None or lat is None:
        return ""
    
    #convert to aheply point
    point = ShapelyPoint(lon, lat)
    country = COUNTRIES_GDF['NAME'][COUNTRIES_GDF['geometry'].contains(point)]
    # Need to handle cases when no country found
    country = country.item() if len(country) > 0 else ""

    return country

def to_json_cols(cols_list):
    out_dict = {}
    for col in cols_list:
        F.to_json(col)

udf_get_country = udf(get_country, T.StringType())


## TBD ##
def enrich_add_country_column(df, epoch_id):
    logger.debug(f'in enrich_add_country_column epoch_id:{epoch_id}, with len(df): {df.count()}')
    if df is None or df.count() <= 0:
        return
    minidf = df.limit(100)
    start_enrich = time.time()
#     df = df.withColumn('country', udf_get_country('longitude', 'latitude'))
    df = minidf.withColumn('country', udf_get_country('longitude', 'latitude'))
    end_enrich = time.time()
    logger.debug(f'enriched {df.count()} records, within {end_enrich - start_enrich} seconds')
    test_df = df.toJSON().toDF('value')
    logger.debug(f'this is the df: {test_df.show()}')
#     rdd = df.rdd.map(lambda row: row.asDict())
#     logger.debug(f'rdd is {rdd.collect()}')
    # send to kafka
#     df.select(F.toJSON())\
#         .write\
#         .format("kafka") \
#         .option("kafka.bootstrap.servers", broker) \
#         .option("topic", topic_enriched_real_time_states) \
#         .save()


In [7]:
from pyspark.sql import functions as F

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", broker) \
    .option("subscribe", topic_real_time_states) \
    .option("startingOffsets", "latest")\
    .load()

state_vectors_df = df.select(F.from_json(F.col("value").cast("string"), schema).alias("value"))\
                        .select((F.explode("value").alias("value")))\
                        .select("value.*")

# # Write to parquet file
# TBD - handle target path creation
parquet_path = 'hdfs://cnt7-naya-cdh6.org:8020/FinalProject/parquet_archive'
parquet_checkpoint_path = "/home/naya/parquet_checkpoint"
parquet_write = state_vectors_df\
                .withColumn('date_hour', F.date_trunc('hour', state_vectors_df.time))\
                .writeStream\
                .outputMode("append")\
                .format("parquet")\
                .partitionBy('date_hour')\
                .option("checkpointLocation", parquet_checkpoint_path)\
                .option("path", parquet_path)\
                .start()

tables_write = state_vectors_df\
                .writeStream\
                .foreachBatch(write_to_tables)\
                .start()

# # TBD # #
# enrich_with_countries = state_vectors_df\
#             .writeStream\
#             .foreachBatch(enrich_add_country_column)\
#             .start()


2020-01-16 01:11:55,220 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 0 len: 0
2020-01-16 01:12:38,634 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 1 len: 12718
2020-01-16 01:12:54,842 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 2 len: 12710
2020-01-16 01:13:08,659 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 3 len: 12730
2020-01-16 01:13:23,377 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 4 len: 12728
2020-01-16 01:13:40,646 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 5 len: 12748
2020-01-16 01:13:56,630 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 6 len: 12729
2020-01-16 01:14:17,567 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 7 len: 12737
2020-01-16 01:14:35,260 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 8 len: 12732
2020-01-16 01:14:56,374 - opensky.spark_consumer - INFO - write_to_hive: epoch_id: 9 len: 12725
