In [0]:
# The goal of this project is to manipulate deeply nested json data and transform them into structured data 

In [0]:
# Loading from S3
filepath = "s3://full-stack-bigdata-datasets/Big_Data/YOUTUBE/songs.json"
df = spark.read.format('json').load(filepath, multiline=True)

In [0]:
# Tidying up
# Fixing rows
df.printSchema()
from pyspark.sql import functions as F
df.select(F.explode('items')).count()
items_df = df.select(F.explode('items').alias('items'))
items_df.count()

root
 |-- etag: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- contentDetails: struct (nullable = true)
 |    |    |    |-- caption: string (nullable = true)
 |    |    |    |-- contentRating: struct (nullable = true)
 |    |    |    |    |-- ytRating: string (nullable = true)
 |    |    |    |-- definition: string (nullable = true)
 |    |    |    |-- dimension: string (nullable = true)
 |    |    |    |-- duration: string (nullable = true)
 |    |    |    |-- licensedContent: boolean (nullable = true)
 |    |    |    |-- projection: string (nullable = true)
 |    |    |    |-- regionRestriction: struct (nullable = true)
 |    |    |    |    |-- allowed: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- blocked: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |-- etag: string (nullable = t

In [0]:
# We're making progress, we now have one row per result (e.g. song)!
# But each song is a deeply nested structure..I will take care of this .
items_df.limit(5).toPandas()

  Unable to convert the field items. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Nested StructType not supported in conversion to Arrow
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Unnamed: 0,items
0,"((false, (None,), sd, 2d, PT3M33S, True, recta..."
1,"((false, (None,), hd, 2d, PT7M46S, False, rect..."
2,"((false, (None,), sd, 2d, PT3M7S, False, recta..."
3,"((false, (None,), hd, 2d, PT3M43S, False, rect..."
4,"((false, (None,), hd, 2d, PT5M, False, rectang..."


In [0]:
df.count()


Out[4]: 100

In [0]:
df.printSchema()


root
 |-- etag: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- contentDetails: struct (nullable = true)
 |    |    |    |-- caption: string (nullable = true)
 |    |    |    |-- contentRating: struct (nullable = true)
 |    |    |    |    |-- ytRating: string (nullable = true)
 |    |    |    |-- definition: string (nullable = true)
 |    |    |    |-- dimension: string (nullable = true)
 |    |    |    |-- duration: string (nullable = true)
 |    |    |    |-- licensedContent: boolean (nullable = true)
 |    |    |    |-- projection: string (nullable = true)
 |    |    |    |-- regionRestriction: struct (nullable = true)
 |    |    |    |    |-- allowed: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- blocked: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |-- etag: string (nullable = t

In [0]:
'''
    Our schema is like a tree, we want to collect all its leaves and put them neatly as columns of our DataFrame.
That's called flattening a schema '''

Out[6]: "\n    Our schema is like a tree, we want to collect all its leaves and put them neatly as columns of our DataFrame.\nThat's called flattening a schema "

In [0]:
from pyspark.sql import functions as F

df.select('items.snippet.title').show(5)
df.withColumn('items.snippet.title', F.col('items.snippet.title'))

+--------------------+
|               title|
+--------------------+
|[VOLO. "L'air d'u...|
|[BANKS - WAITING ...|
|[OH MY DAYUM ft. ...|
|[caravan palace -...|
|[ALB - Whispers U...|
+--------------------+
only showing top 5 rows

Out[7]: DataFrame[etag: string, items: array<struct<contentDetails:struct<caption:string,contentRating:struct<ytRating:string>,definition:string,dimension:string,duration:string,licensedContent:boolean,projection:string,regionRestriction:struct<allowed:array<string>,blocked:array<string>>>,etag:string,id:string,kind:string,snippet:struct<categoryId:string,channelId:string,channelTitle:string,defaultAudioLanguage:string,defaultLanguage:string,description:string,liveBroadcastContent:string,localized:struct<description:string,title:string>,publishedAt:string,tags:array<string>,thumbnails:struct<default:struct<height:bigint,url:string,width:bigint>,high:struct<height:bigint,url:string,width:bigint>,maxres:struct<height:bigint,url:string,width:bigint>,medium:stru

In [0]:
'''What this functions does, is that it walk the schema of our DataFrame with a nested schema and harvest its leave. Returning them with full path like this items.snippet.title as a string.'''
# we'll work with the schema in json format, which will be way easier to manipulate
df.schema.jsonValue()
df.schema.jsonValue().keys()
# Only two keys at this stage, type and fields, let's explore the type key
df.schema.jsonValue()["type"]
# The value associated is struct
# let's explore the content of the other key
df.schema.jsonValue()["fields"]
#it's a list, what's the first element?
df.schema.jsonValue()["fields"][0]
# what keys does it have ?
df.schema.jsonValue()["fields"][0].keys()
# the key name contains the name of the field
df.schema.jsonValue()["fields"][0]["name"]
df.schema.jsonValue()["fields"][0]["type"]


Out[8]: 'string'

In [0]:
from pyspark.sql.types import StructType, StructField
from typing import List, Dict, Generator, Union, Callable
# This is actually written like a scala function
def walkSchema(schema: Union[StructType, StructField]) -> Generator[str, None, None]:
    
    # we define a function _walk that produces a string generator from
    # a dictionnary "schema_dct", and a string "prefix"
    def _walk(schema_dct: Dict['str', Union['str', list, dict]],
              prefix: str = "") -> Generator[str, None, None]:
        assert isinstance(prefix, str), "prefix should be a string" # check if prefix is a string
        
        # this function returns "name" if there's no prefix and "prefix.name" if prefix exists
        fullName: Callable[str, str] = lambda name: ( 
            name if not prefix else f"{prefix}.{name}")
        
        # we get the next name one level lower from the dictionnary
        name = schema_dct.get('name', '')
        
        # if the type is struct then we search for the fields key
        # if fields is there we apply the function again and dig one level deeper in
        # the schema and set a prefix
        if schema_dct['type'] == 'struct':
            assert 'fields' in schema_dct, (
                "It's a StructType, we should have some fields")
            for field in schema_dct['fields']:
                yield from _walk(field, prefix=prefix)
        # if we have a dict type and we can't find fields then we
        # dig one level deeper and apply the _walk function again
        elif isinstance(schema_dct['type'], dict):
            assert 'fields' not in schema_dct, (
                "We're missing some keys here")
            yield from _walk(schema_dct['type'], prefix=fullName(name))
        # If we finally reached the end and found a name we yield the full name
        elif name:
            yield fullName(name)
    
    yield from _walk(schema.jsonValue())

# yield as opposed to return, returns a result but does not stop the function from running, it keeps
# running even after returning one result.

In [0]:
col_names = walkSchema(df.schema)
col_names

Out[10]: <generator object walkSchema at 0x7f9caa7dd900>

In [0]:
# Iterate over the walked schema
for col_name in walkSchema(df.schema):
  print(col_name)

etag
kind
pageInfo.resultsPerPage
pageInfo.totalResults


In [0]:
# Perfect, that's all the leafs of our schema.And we can just repeat the work we did with items.snippet.title for every column of this list.


In [0]:
from functools import reduce

from pyspark.sql import functions as F
exploded_df = reduce(
  lambda memo_df, col_name: memo_df.withColumn(col_name, F.col(col_name)),
  walkSchema(df.schema), df
).drop('items')

exploded_df.limit(5).toPandas()

Unnamed: 0,etag,kind,pageInfo,pageInfo.resultsPerPage,pageInfo.totalResults
0,U0fncx_GV9jD5SKQr15LMvwuPcs,youtube#videoListResponse,"{'resultsPerPage': 38, 'totalResults': 38}",38,38
1,LZV6LlN3-4QwaIGfe9KBxl0cJvE,youtube#videoListResponse,"{'resultsPerPage': 38, 'totalResults': 38}",38,38
2,Ou4xXi-09RdImAeo1EFJC01i8iM,youtube#videoListResponse,"{'resultsPerPage': 43, 'totalResults': 43}",43,43
3,tDsVpy7PmDE2n6ZAO0rHpUpbqz0,youtube#videoListResponse,"{'resultsPerPage': 40, 'totalResults': 40}",40,40
4,otOtu8WFJDFkzdBR_PG0LptIkK4,youtube#videoListResponse,"{'resultsPerPage': 37, 'totalResults': 37}",37,37
