In [None]:
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_json, col, udf, explode, lit, coalesce
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json

In [None]:
spark = SparkSession.builder.appName('json_test').master('local').getOrCreate()
sc = spark.sparkContext

In [None]:
pwd = Path().resolve()
pwd
repo_dir = str(pwd).replace('/src', '')
repo_dir = Path(repo_dir).resolve()
# logger.info(type(repo_dir))
# logger.info(repo_dir)
data_dir = Path(repo_dir) / 'data'
# logger.info(data_dir)
clp_path = str(data_dir / 'clp-places.json')
# logger.info(Path(clp_path).is_file())
cogo_path = str(data_dir / 'cogo-colpnts.json')
# logger.info(Path(cogo_path).is_file())
dats_path = str(data_dir / 'dats-places.json')
# logger.info(Path(dats_path).is_file())
okay_path = str(data_dir / 'okay-places.json')
# logger.info(Path(okay_path).is_file())
spar_path = str(data_dir / 'spar-places.json')
# logger.info(Path(spar_path).is_file())
log_path = Path(repo_dir) / 'logs'
#log_path.is_dir()

In [None]:
import logging
logger = logging.getLogger(__name__)

# Set the log level
logging.basicConfig(level='INFO')

# Set the format
#logging.Formatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)
# d = {'clientip': '192.168.0.1', 'user': 'keerthan'}
#logger.warning('Protocol problem: %s', 'connection reset', extra=d)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# Set the log file name
log_file = str(Path(log_path) / 'assignment2.log')

# Create the handler
handler = logging.FileHandler(filename=log_file, mode='w')

# set the formatter to the handler created
handler.setFormatter(formatter)

# Add the handler to the logger
logger.addHandler(handler)

logger.info('Log test')


In [None]:
# Read json files
clp_df = spark.read.options(Header=True).json(clp_path)
cogo_df = spark.read.options(Header=True).json(cogo_path)
dats_df = spark.read.options(Header=True).json(dats_path)
okay_df = spark.read.options(Header=True).json(okay_path)
spar_df = spark.read.options(Header=True).json(spar_path)

In [None]:
clp_df.count()
#clp_df.logger.infoSchema()
#okay_df.show(1, 0)

In [None]:
cogo_df.count()
#cogo_df.logger.infoSchema()
#cogo_df.show(2, 0)

In [None]:
dats_df.count()
#dats_df.logger.infoSchema()
# okay_df.show(2, 0)

In [None]:
okay_df.count()
#okay_df.logger.infoSchema()
# okay_df.show(2, 0)

In [None]:
spar_df.count()
#spar_df.logger.infoSchema()


In [None]:
temporaryClosures_schema = StructType([
    StructField('from', StringType()),
    StructField('till', StringType())
])

In [None]:
clp_tmp_exists = clp_df.filter("cast(temporaryClosures as string) != '[]'")
clp_tmp_not_exists = clp_df.filter("cast(temporaryClosures as string) = '[]'")
logger.info(clp_tmp_exists.count())
logger.info(clp_tmp_not_exists.count())

In [None]:
# exploding temporaryClosures column for clp which has value
# clp_tmp_clos_df = clp_df.withColumn('temporaryClosures_exp', explode(col('temporaryClosures')))
clp_tmp_exists_exp = clp_tmp_exists.withColumn('temporaryClosures', explode(col('temporaryClosures')))
logger.info(clp_tmp_exists_exp.count())

clp_tmp_exists_exp = clp_tmp_exists_exp.withColumn('temporaryClosure_from', col('temporaryClosures.from')) \
.withColumn('temporaryClosure_till', col('temporaryClosures.till')) \
.drop('temporaryClosures')

clp_tmp_not_exists = clp_tmp_not_exists.withColumn('temporaryClosure_from', lit('')) \
.withColumn('temporaryClosure_till', lit('')) \
    .drop('temporaryClosures')
#clp_tmp_exists_exp.logger.infoSchema()
#clp_tmp_not_exists.logger.infoSchema()

In [None]:
# Union exists and not exists clp datasets
clp_union_df = clp_tmp_exists_exp.union(clp_tmp_not_exists)
clp_union_df.count()
#clp_union_df.logger.infoSchema()

# exploding handoverservices
#clp_final_df = clp_union_df.withColumn('handoverServices', explode(col('handoverServices')))
clp_final_df = clp_union_df
#clp_final_df.logger.infoSchema()
logger.info(clp_final_df.count())

In [None]:
cogo_tmp_df = cogo_df.withColumn('temporaryClosure_from', lit('')) \
    .withColumn('temporaryClosure_till', lit(''))
cogo_tmp_df.count()

In [None]:
cogo_final_df = cogo_tmp_df.drop('temporaryClosures')
cogo_final_df.count()

In [None]:
clp_cogo = clp_final_df.union(cogo_final_df)

In [None]:
clp_cogo.show(2,0)
clp_cogo.count()

In [None]:
# okay


okay_tmp_exists = okay_df.filter("cast(temporaryClosures as string) != '[]'")
okay_tmp_not_exists = okay_df.filter("cast(temporaryClosures as string) = '[]'")
logger.info(okay_tmp_exists.count())
logger.info(okay_tmp_not_exists.count())


# exploding temporaryClosures column for okay which has value
# okay_tmp_clos_df = clp_df.withColumn('temporaryClosures_exp', explode(col('temporaryClosures')))
okay_tmp_exists_exp = okay_tmp_exists.withColumn('temporaryClosures', explode(col('temporaryClosures')))
logger.info(okay_tmp_exists_exp.count())
okay_tmp_exists_exp = okay_tmp_exists_exp.withColumn('temporaryClosure_from', col('temporaryClosures.from')) \
.withColumn('temporaryClosure_till', col('temporaryClosures.till')) \
.drop('temporaryClosures')
okay_tmp_exists_exp.count()
#okay_tmp_exists_exp.logger.infoSchema()
#okay_tmp_not_exists.logger.infoSchema()

# transforming not explode dataframe to temp column to struct from array
okay_tmp_not_exists.count()
okay_tmp_not_exists = okay_tmp_not_exists.drop('temporaryClosures')\
    .withColumn('temporaryClosure_from', lit('')) \
    .withColumn('temporaryClosure_till', lit(''))
okay_tmp_not_exists.count()


# Union exists and not exists clp datasets
okay_union_df = okay_tmp_exists_exp.union(okay_tmp_not_exists)
okay_union_df.count()
#okay_union_df.logger.infoSchema()

# exploding handoverservices
#okay_final_df = okay_union_df.withColumn('handoverServices', explode(col('handoverServices')))
okay_final_df = okay_union_df
#okay_final_df.logger.infoSchema()
logger.info(okay_final_df.count())

In [None]:
clp_cogo_okay = clp_cogo.union(okay_final_df)
clp_cogo_okay.count()