In [1]:
import findspark
findspark.init()
import pyspark
import random

# find spark version by running "pyspark.version" in a cell

In [2]:
'''
    Read json file into IP dataframe
'''

from pyspark.sql import SparkSession

# spark is an existing SparkSession, not a module (I got confused with the documentation)

spark = SparkSession.builder \
    .master("local") \
    .appName("Parse_JSON") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

path = "FEDirector_port_data.txt"

ip_df = spark.read.json(path)
ip_df.printSchema()
ip_df.show()

root
 |-- errorcode: string (nullable = true)
 |-- errormessage: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- label: string (nullable = true)
 |-- status: string (nullable = true)
 |-- storageidlist: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- errorcode: string (nullable = true)
 |    |    |-- errormessage: string (nullable = true)
 |    |    |-- fedirectorList: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- directorId: string (nullable = true)
 |    |    |    |    |-- errorcode: string (nullable = true)
 |    |    |    |    |-- errordesc: string (nullable = true)
 |    |    |    |    |-- metrics: string (nullable = true)
 |    |    |    |    |-- portMetricDataList: array (nullable = true)
 |    |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |--

In [3]:
from pyspark.sql import functions
from pyspark.sql.functions import explode

ip_df.select("ip").show()
ip_df.select("storageidlist").show()

# Drop columns errorcode and errormessage
ip_df = ip_df.drop("errorcode", "errormessage")

+------------+
|          ip|
+------------+
|10.206.88.44|
+------------+

+--------------------+
|       storageidlist|
+--------------------+
|[[,, [[FA-1E,,,, ...|
+--------------------+



In [4]:
'''
    Select storageid
'''

from pyspark.sql.functions import explode

ip_df = ip_df.withColumn("storageid_flat",explode(ip_df.storageidlist.storageid))
ip_df.show()

+------------+---------------+-------+--------------------+------------------+--------------------+--------------+
|          ip|          label| status|       storageidlist|          sublabel|                  ts|storageid_flat|
+------------+---------------+-------+--------------------+------------------+--------------------+--------------+
|10.206.88.44|emc_performence|Success|[[,, [[FA-1E,,,, ...|FEDirectorPortData|2021-01-19 13:36:...|  000295700670|
+------------+---------------+-------+--------------------+------------------+--------------------+--------------+



In [5]:

'''
    Select directorid
'''

ip_df = ip_df.withColumn("directorid_list", explode(ip_df.storageidlist.fedirectorList))

ip_df = ip_df.withColumn("directorId_flat", explode(ip_df.directorid_list.directorId))

ip_df.show()

+------------+---------------+-------+--------------------+------------------+--------------------+--------------+--------------------+---------------+
|          ip|          label| status|       storageidlist|          sublabel|                  ts|storageid_flat|     directorid_list|directorId_flat|
+------------+---------------+-------+--------------------+------------------+--------------------+--------------+--------------------+---------------+
|10.206.88.44|emc_performence|Success|[[,, [[FA-1E,,,, ...|FEDirectorPortData|2021-01-19 13:36:...|  000295700670|[[FA-1E,,,, [[[[[...|          FA-1E|
|10.206.88.44|emc_performence|Success|[[,, [[FA-1E,,,, ...|FEDirectorPortData|2021-01-19 13:36:...|  000295700670|[[FA-1E,,,, [[[[[...|          FA-2E|
|10.206.88.44|emc_performence|Success|[[,, [[FA-1E,,,, ...|FEDirectorPortData|2021-01-19 13:36:...|  000295700670|[[FA-1E,,,, [[[[[...|          FA-3E|
|10.206.88.44|emc_performence|Success|[[,, [[FA-1E,,,, ...|FEDirectorPortData|2021-01-19

In [6]:
'''
    Select portMetricDataList
'''

ip_df = ip_df.withColumn("portMetricDataList", explode(ip_df.directorid_list.portMetricDataList))

# Drop columns status, label, sublabel, storageidlist, directorid_list
ip_df = ip_df.drop("status","label","sublabel","storageidlist","directorid_list")
ip_df.show()

+------------+--------------------+--------------+---------------+--------------------+
|          ip|                  ts|storageid_flat|directorId_flat|  portMetricDataList|
+------------+--------------------+--------------+---------------+--------------------+
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|
|10.206.88.44|2021-01-19 13:36:.

In [7]:
'''
    Split portMetricDataList portMetricData-1 into portMetricData-2
'''

# Drop columns status, label, sublabel, storageidlist, directorid_list
ip_df = ip_df.drop("status","label","sublabel","storageidlist","directorid_list")

ip_df = ip_df.withColumn("portMetricData_1", explode(ip_df.portMetricDataList.getItem(0))).withColumn("portMetricData_2", explode(ip_df.portMetricDataList.getItem(1)))
ip_df.show()

+------------+--------------------+--------------+---------------+--------------------+--------------------+--------------------+
|          ip|                  ts|storageid_flat|directorId_flat|  portMetricDataList|    portMetricData_1|    portMetricData_2|
+------------+--------------------+--------------+---------------+--------------------+--------------------+--------------------+
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[2021-01-19 13:...|[[[2021-01-19 13:...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[2021-01-19 13:...|[[[2021-01-19 13:...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[2021-01-19 13:...|[[[2021-01-19 13:...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[2021-01-19 13:...|[[[2021-01-19 13:...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[

In [8]:
'''
    Select metricid
'''

ip_df= ip_df.withColumn("metricid_1",ip_df.portMetricData_1.metricid).withColumn("metricid_2",ip_df.portMetricData_2.metricid)
ip_df.show()

+------------+--------------------+--------------+---------------+--------------------+--------------------+--------------------+-----------+-----------+
|          ip|                  ts|storageid_flat|directorId_flat|  portMetricDataList|    portMetricData_1|    portMetricData_2| metricid_1| metricid_2|
+------------+--------------------+--------------+---------------+--------------------+--------------------+--------------------+-----------+-----------+
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[2021-01-19 13:...|[[[2021-01-19 13:...|PercentBusy|PercentBusy|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[2021-01-19 13:...|[[[2021-01-19 13:...|PercentBusy|PercentBusy|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-01-19 1...|[[[2021-01-19 13:...|[[[2021-01-19 13:...|PercentBusy|PercentBusy|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|[[[[[2021-

In [9]:
'''
    Select value and ts
'''

ip_df= ip_df.withColumn("data_1_1",ip_df.portMetricData_1.data.getItem(0)).withColumn("data_1_2",ip_df.portMetricData_1.data.getItem(1)).withColumn("data_2_1",ip_df.portMetricData_1.data.getItem(0)).withColumn("data_2_2",ip_df.portMetricData_2.data.getItem(1))

# Drop portMetricData_1, portMetricData_2
ip_df = ip_df.drop("portMetricData_1","portMetricData_2")

ip_df = ip_df.withColumn("value_1_1",ip_df.data_1_1.value).withColumn("value_1_2",ip_df.data_1_2.value).withColumn("ts_1_1",ip_df.data_1_1.ts).withColumn("ts_1_2",ip_df.data_1_2.ts)

# Drop portMetricDataList, data_1_1, data_1_2, data_2_1, data_2_2)
ip_df = ip_df.drop("portMetricDataList", "data_1_1", "data_1_2", "data_2_1", "data_2_2")

ip_df.show()


+------------+--------------------+--------------+---------------+-----------+-----------+------------+------------+--------------------+--------------------+
|          ip|                  ts|storageid_flat|directorId_flat| metricid_1| metricid_2|   value_1_1|   value_1_2|              ts_1_1|              ts_1_2|
+------------+--------------------+--------------+---------------+-----------+-----------+------------+------------+--------------------+--------------------+
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|PercentBusy|PercentBusy|0.0031689804| 0.003317992|2021-01-19 13:25:...|2021-01-19 13:30:...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|PercentBusy|PercentBusy|   16.158463|   12.181889|2021-01-19 13:25:...|2021-01-19 13:30:...|
|10.206.88.44|2021-01-19 13:36:...|  000295700670|          FA-1E|PercentBusy|PercentBusy|     2.48304|   5.6656494|2021-01-19 13:25:...|2021-01-19 13:30:...|
|10.206.88.44|2021-01-19 13:36:...|  000295700

In [10]:
'''
    Write dataframe to csv
'''

import os
import tempfile


ip_df.write.csv("Parse_JSON.csv")

In [11]:
sc.stop()