In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, ArrayType, BooleanType
import json

spark = SparkSession.builder.appName("Glue stuff").getOrCreate()

In [9]:
users_db_data = "s3://of-lakeformation-users/users_users_api_public_users_settings/version_14/part-00000-d530da31-0c19-4260-87f6-81b97a5778ee-c000.snappy.parquet"

In [None]:
!aws s3 cp s3://of-lakeformation-users/users_users_api_public_users_settings/version_14/part-00000-d530da31-0c19-4260-87f6-81b97a5778ee-c000.snappy.parquet users_db_data.parquet

In [2]:
full_user_settings_schema = StructType([
    StructField("value", StructType([
        StructField("settings", StructType([
            StructField("meta.language", StringType(), True),
            StructField("favourite.club", IntegerType(), True),
            StructField("following.clubs", ArrayType(IntegerType()), True),
            StructField("push.digest_news", BooleanType(), True),
            StructField("following.leagues", ArrayType(IntegerType()), True),
            StructField("meta.geoip_country", StringType(), True),
            StructField("ordered_menu_items", ArrayType(StructType([
                StructField("id", IntegerType()),
                StructField("type", StringType()),
            ])), True),
            StructField("push.breaking_news", BooleanType(), True),
            StructField("push.enabled_teams", ArrayType(StructType([
                StructField("id", IntegerType()),
                StructField("eventTypes", ArrayType(StringType())),
            ])), True),
            StructField("favourite.national_team", IntegerType(), True),
            StructField("following.national_teams", ArrayType(IntegerType()), True),
            StructField("meta.psychological_country", StringType(), True),
        ]), True),
        StructField("updatedAt", TimestampType(), True)
    ]), True),
    StructField("key", StringType(), True)
])

In [52]:
df = spark.read.load("users_db_data.parquet")

In [53]:
df.show()

+--------------------+--------------------+
|               value|                 key|
+--------------------+--------------------+
|{"settings": {"me...|a3d7e839643792a1c...|
|{"settings": {"me...|c1b3c64ee94d31643...|
|{"settings": {"fa...|0983e155a52724599...|
|{"settings": {"fo...|47335bad0b449e3a4...|
|{"settings": {"me...|7d17b54f4001a4920...|
|{"settings": {"me...|f41e32944f9087d25...|
|{"settings": {"me...|78ec38686a40c9313...|
|{"settings": {"me...|b28efe7f4556c57c2...|
|{"settings": {"me...|2b00dac525cc6093b...|
|{"settings": {"me...|8150a9651851856e7...|
|{"settings": {"me...|4f97b6ad8609ed999...|
|{"settings": {"me...|dd0b92a8353353aaf...|
|{"settings": {"fa...|79630d4f1c9e61cdd...|
|{"settings": {"me...|4202d583100c2d337...|
|{"settings": {"me...|2dcbe86777488b425...|
|{"settings": {"me...|0be37627ac47e8eca...|
|{"settings": {"me...|49a4e33b3ffb5da0e...|
|{"settings": {"me...|9757c9f45d8372e50...|
|{"settings": {"me...|2f4bcf67c8d28a59f...|
|{"settings": {"fa...|fc237ddd81

In [7]:
df.printSchema()

root
 |-- value: string (nullable = true)
 |-- key: string (nullable = true)



In [8]:
df.show()

+--------------------+--------------------+
|               value|                 key|
+--------------------+--------------------+
|{"settings": {"me...|a3d7e839643792a1c...|
|{"settings": {"me...|c1b3c64ee94d31643...|
|{"settings": {"fa...|0983e155a52724599...|
|{"settings": {"fo...|47335bad0b449e3a4...|
|{"settings": {"me...|7d17b54f4001a4920...|
|{"settings": {"me...|f41e32944f9087d25...|
|{"settings": {"me...|78ec38686a40c9313...|
|{"settings": {"me...|b28efe7f4556c57c2...|
|{"settings": {"me...|2b00dac525cc6093b...|
|{"settings": {"me...|8150a9651851856e7...|
|{"settings": {"me...|4f97b6ad8609ed999...|
|{"settings": {"me...|dd0b92a8353353aaf...|
|{"settings": {"fa...|79630d4f1c9e61cdd...|
|{"settings": {"me...|4202d583100c2d337...|
|{"settings": {"me...|2dcbe86777488b425...|
|{"settings": {"me...|0be37627ac47e8eca...|
|{"settings": {"me...|49a4e33b3ffb5da0e...|
|{"settings": {"me...|9757c9f45d8372e50...|
|{"settings": {"me...|2f4bcf67c8d28a59f...|
|{"settings": {"fa...|fc237ddd81

In [36]:
user_settings_schema = StructType([
    StructField("settings", StructType([
        StructField("meta.language", StringType(), True),
        StructField("favourite.club", IntegerType(), True),
        StructField("following.clubs", ArrayType(IntegerType()), True),
        StructField("push.digest_news", BooleanType(), True),
        StructField("following.leagues", ArrayType(IntegerType()), True),
        StructField("meta.geoip_country", StringType(), True),
        StructField("ordered_menu_items", ArrayType(StructType([
            StructField("id", IntegerType()),
            StructField("type", StringType()),
        ])), True),
        StructField("push.breaking_news", BooleanType(), True),
        StructField("push.enabled_teams", ArrayType(StructType([
            StructField("id", IntegerType()),
            StructField("eventTypes", ArrayType(StringType())),
        ])), True),
        StructField("favourite.national_team", IntegerType(), True),
        StructField("following.national_teams", ArrayType(IntegerType()), True),
        StructField("meta.psychological_country", StringType(), True),
    ]), True),
    StructField("updatedAt", TimestampType(), True)
])
user_settings_schema_renamed = StructType([
    StructField("settings", StructType([
        StructField("meta_language", StringType(), True),
        StructField("favourite_club", IntegerType(), True),
        StructField("following_clubs", ArrayType(IntegerType()), True),
        StructField("push_digest_news", BooleanType(), True),
        StructField("following_leagues", ArrayType(IntegerType()), True),
        StructField("meta_geoip_country", StringType(), True),
        StructField("ordered_menu_items", ArrayType(StructType([
            StructField("id", IntegerType()),
            StructField("type", StringType()),
        ])), True),
        StructField("push_breaking_news", BooleanType(), True),
        StructField("push_enabled_teams", ArrayType(StructType([
            StructField("id", IntegerType()),
            StructField("eventTypes", ArrayType(StringType())),
        ])), True),
        StructField("favourite_national_team", IntegerType(), True),
        StructField("following_national_teams", ArrayType(IntegerType()), True),
        StructField("meta_psychological_country", StringType(), True),
    ]), True),
    StructField("updatedAt", TimestampType(), True)
])

In [42]:
parsed_df = df.select(
    F.col("key").alias('user_id'),
    F.from_json(df.value, user_settings_schema).cast(user_settings_schema_renamed).alias("settings")
)
parsed_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- settings: struct (nullable = true)
 |    |-- settings: struct (nullable = true)
 |    |    |-- meta_language: string (nullable = true)
 |    |    |-- favourite_club: integer (nullable = true)
 |    |    |-- following_clubs: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- push_digest_news: boolean (nullable = true)
 |    |    |-- following_leagues: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- meta_geoip_country: string (nullable = true)
 |    |    |-- ordered_menu_items: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: integer (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- push_breaking_news: boolean (nullable = true)
 |    |    |-- push_enabled_teams: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |

In [50]:
followed_teams = parsed_df.select(
    "user_id",
    F.explode("settings.settings.following_clubs").alias('team_id')
)

In [51]:
followed_teams.show()

+--------------------+-------+
|             user_id|team_id|
+--------------------+-------+
|f41e32944f9087d25...|   1696|
|78ec38686a40c9313...|    149|
|b28efe7f4556c57c2...|   1693|
|b28efe7f4556c57c2...|      5|
|b28efe7f4556c57c2...|     26|
|4f97b6ad8609ed999...|      6|
|4f97b6ad8609ed999...|    202|
|dd0b92a8353353aaf...|     16|
|4202d583100c2d337...|   1649|
|49a4e33b3ffb5da0e...|     17|
|49a4e33b3ffb5da0e...|    400|
|2f4bcf67c8d28a59f...|    144|
|480cc8d208a6db705...|   1787|
|480cc8d208a6db705...|    575|
|f5986a7783e8da108...|   2304|
|f5986a7783e8da108...|      6|
|f5986a7783e8da108...|    154|
|f5986a7783e8da108...|    155|
|f5986a7783e8da108...|     16|
|f5986a7783e8da108...|     23|
+--------------------+-------+
only showing top 20 rows

