In [13]:
# Create Spark Context
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, ArrayType


spark = SparkSession \
    .builder \
    .getOrCreate()

# Set file
path = "/home/jovyan/work/data/sessions"
output = "/home/jovyan/work/data/output"
base_path = path

session_schema = StructType([
        StructField('api_key', StringType(), True),
        StructField('session_key', StringType(), True),
        StructField('session_start', TimestampType(), True),
        StructField('session_end', TimestampType(), True),
        StructField('inputs', ArrayType(StringType()), True)])
df = spark \
    .read \
    .option('mode', 'DROPMALFORMED') \
    .option('basePath', base_path) \
    .json(f'{path}/*/*.json', schema=session_schema, allowBackslashEscapingAnyCharacter=True)

df = df.select('api_key', 'session_key', 'session_start', 'session_end', 'inputs')

df.show(10)

+--------+--------------------+-------------------+-------------------+--------------------------------+
| api_key|         session_key|      session_start|        session_end|                          inputs|
+--------+--------------------+-------------------+-------------------+--------------------------------+
|WKL6IZCP|cd57397644974f129...|2022-01-23 00:00:08|2022-01-23 07:21:28|  [产权.绝缘.墨镜, 产权.绝缘....|
|WKL6IZCP|5f958a32d55e4ac6b...|2022-01-23 07:21:30|2022-01-23 07:21:35| [导航.到新东方.国外, 导航.到...|
|WKL6IZCP|5c3c6d34e7da4b65b...|2022-01-23 07:21:36|2022-01-23 07:21:37|[到新东方.国外.学习, 到新东方...|
|WKL6IZCP|c187561c46ba4135b...|2022-01-23 07:21:48|2022-01-23 19:40:49|  [产权.绝缘.墨镜, 产权.绝缘....|
|WKL6IZCP|5ce39d743670469b8...|2022-01-23 19:40:51|2022-01-23 19:40:51|        [到江.西省.永新县吉安市]|
|WKL6IZCP|67f54c2e941f4fbb9...|2022-01-23 19:41:08|2022-01-23 23:59:49|  [产权.绝缘.墨镜, 产权.绝缘....|
|1CDQTWQ1|0f6ee4028cc1412ab...|2022-01-23 00:25:22|2022-01-23 00:25:33|            [%D9%85%D8%AB%D9%...|
|1CDQTWQ1|e4a

In [14]:
df.coalesce(1).write \
    .format('json') \
    .mode('overwrite') \
    .save(output)