# Dealing with Json files and arrays in spark

In [1]:
pip install pyspark


Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'C:\Users\Administrator\anaconda3\python.exe -m pip install --upgrade pip' command.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

In [3]:
spark = SparkSession.builder.appName('oop').getOrCreate()

In [6]:
df = spark.read.json('C:/Users/Administrator/test/ML/Machine-Learning/jsontest.json')


In [7]:
for elements in df.collect():
    print(elements)

Row(results=[Row(acked=False, corrected=False, date='2019-03-10', desc='Frozen Alert on Input6', hostIp='1.11.1.13', hostIpDeviceLabel='Map Data', id='400', instance=3, instanceDesc='PQR 826__302', myHashCode=-6754, notes='', oid='1.3.6', productDeviceLabel='PNX-AD  [7]', prouct='3V', selfCorrected=False, severity=2, slot=0, time='14:20:28', trapID=110, varbinds=['ABC 826', '2ndValue', '', '4th Value', '', '', '', '', '', '', '', '', '', '', '', ''])])


In [8]:
df.printSchema()

root
 |-- results: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- acked: boolean (nullable = true)
 |    |    |-- corrected: boolean (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- desc: string (nullable = true)
 |    |    |-- hostIp: string (nullable = true)
 |    |    |-- hostIpDeviceLabel: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- instance: long (nullable = true)
 |    |    |-- instanceDesc: string (nullable = true)
 |    |    |-- myHashCode: long (nullable = true)
 |    |    |-- notes: string (nullable = true)
 |    |    |-- oid: string (nullable = true)
 |    |    |-- productDeviceLabel: string (nullable = true)
 |    |    |-- prouct: string (nullable = true)
 |    |    |-- selfCorrected: boolean (nullable = true)
 |    |    |-- severity: long (nullable = true)
 |    |    |-- slot: long (nullable = true)
 |    |    |-- time: string (nullable = true)
 |    |    |-- trapI

In [16]:
df.show()

+--------------------+
|             results|
+--------------------+
|[{false, false, 2...|
+--------------------+



In [9]:
DF = df.select(explode(df.results))
DF.printSchema() 

root
 |-- col: struct (nullable = true)
 |    |-- acked: boolean (nullable = true)
 |    |-- corrected: boolean (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- desc: string (nullable = true)
 |    |-- hostIp: string (nullable = true)
 |    |-- hostIpDeviceLabel: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- instance: long (nullable = true)
 |    |-- instanceDesc: string (nullable = true)
 |    |-- myHashCode: long (nullable = true)
 |    |-- notes: string (nullable = true)
 |    |-- oid: string (nullable = true)
 |    |-- productDeviceLabel: string (nullable = true)
 |    |-- prouct: string (nullable = true)
 |    |-- selfCorrected: boolean (nullable = true)
 |    |-- severity: long (nullable = true)
 |    |-- slot: long (nullable = true)
 |    |-- time: string (nullable = true)
 |    |-- trapID: long (nullable = true)
 |    |-- varbinds: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [10]:
DF = DF.withColumnRenamed("col","data")
DF.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- acked: boolean (nullable = true)
 |    |-- corrected: boolean (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- desc: string (nullable = true)
 |    |-- hostIp: string (nullable = true)
 |    |-- hostIpDeviceLabel: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- instance: long (nullable = true)
 |    |-- instanceDesc: string (nullable = true)
 |    |-- myHashCode: long (nullable = true)
 |    |-- notes: string (nullable = true)
 |    |-- oid: string (nullable = true)
 |    |-- productDeviceLabel: string (nullable = true)
 |    |-- prouct: string (nullable = true)
 |    |-- selfCorrected: boolean (nullable = true)
 |    |-- severity: long (nullable = true)
 |    |-- slot: long (nullable = true)
 |    |-- time: string (nullable = true)
 |    |-- trapID: long (nullable = true)
 |    |-- varbinds: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [12]:
DF2 = DF.select("data.id","data.varbinds")
DF2.printSchema()
DF2.show()

root
 |-- id: string (nullable = true)
 |-- varbinds: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+--------------------+
| id|            varbinds|
+---+--------------------+
|400|[ABC 826, 2ndValu...|
+---+--------------------+



In [13]:
DF1 = DF.select("data.id","data.hostIP","data.slot","data.prouct","data.trapID","data.date","data.time")
DF1.show()

+---+---------+----+------+------+----------+--------+
| id|   hostIP|slot|prouct|trapID|      date|    time|
+---+---------+----+------+------+----------+--------+
|400|1.11.1.13|   0|    3V|   110|2019-03-10|14:20:28|
+---+---------+----+------+------+----------+--------+



In [15]:
# Joining both data frames

Final_DF = DF1.join(DF2,['id'])
Final_DF.show()

+---+---------+----+------+------+----------+--------+--------------------+
| id|   hostIP|slot|prouct|trapID|      date|    time|            varbinds|
+---+---------+----+------+------+----------+--------+--------------------+
|400|1.11.1.13|   0|    3V|   110|2019-03-10|14:20:28|[ABC 826, 2ndValu...|
+---+---------+----+------+------+----------+--------+--------------------+

