In [1]:
# %load user.avsc
{
    "namespace": "user.profile",
    "type": "record",
    "name": "User",
    "fields": [
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "age",
            "type": "int"
        },
        {
            "name": "favorite_books",
            "type": {
                "type": "array",
                "items": {
                    "type": "record",
                    "name": "Book",
                    "fields": [
                        {
                            "name": "name",
                            "type": "string"
                        },{
                            "name": "publish_year",
                            "type": "int"
                        },{
                            "name": "author",
                            "type": "string"
                        }
                    ]
                }
            }

        }
    ]
}

{'namespace': 'user.profile',
 'type': 'record',
 'name': 'User',
 'fields': [{'name': 'name', 'type': 'string'},
  {'name': 'age', 'type': 'int'},
  {'name': 'favorite_books',
   'type': {'type': 'array',
    'items': {'type': 'record',
     'name': 'Book',
     'fields': [{'name': 'name', 'type': 'string'},
      {'name': 'publish_year', 'type': 'int'},
      {'name': 'author', 'type': 'string'}]}}}]}

In [2]:
book_1 = {
    "name": "Avro Example",
    "publish_year": 2020,
    "author": "Ben"
}

book_2 = {
    "name": "Avro Explorer",
    "publish_year": 2021,
    "author": "Jim"
}

book_3 = {
    "name": "Data Extraction",
    "publish_year": 2019,
    "author": "Ted"
}

user_1 = {
    "name": "Ken",
    "age": 17,
    "favorite_books": [
        book_1
    ]
}

user_2 = {
    "name": "Eva",
    "age": 17,
    "favorite_books": [
        book_2, book_3
    ]
}

In [3]:
%%time

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc", "rb").read())

# not closable
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)

for i in range(1, 100001):
    writer.append(user_1)
    writer.append(user_2)

writer.close()

CPU times: user 6.74 s, sys: 18.1 ms, total: 6.76 s
Wall time: 6.86 s


In [4]:
# not closable
# reader = DataFileReader(open("users.avro", "rb"), DatumReader())
# for user in reader:
#     print(user)

In [5]:
%%time
from fastavro import writer, reader
from fastavro.schema import load_schema

schema = load_schema("user.avsc")
print(schema)
users = []

for input in range(1, 100001):
    users.append(user_1)
    users.append(user_2)

with open('users_fast.avro', 'wb') as out:
    writer(out, schema, users)

{'type': 'record', 'name': 'user.profile.User', 'fields': [{'name': 'name', 'type': 'string'}, {'name': 'age', 'type': 'int'}, {'name': 'favorite_books', 'type': {'type': 'array', 'items': {'type': 'record', 'name': 'user.profile.Book', 'fields': [{'name': 'name', 'type': 'string'}, {'name': 'publish_year', 'type': 'int'}, {'name': 'author', 'type': 'string'}]}}}], '__fastavro_parsed': True, '__named_schemas': {'user.profile.User': {'type': 'record', 'name': 'user.profile.User', 'fields': [{'name': 'name', 'type': 'string'}, {'name': 'age', 'type': 'int'}, {'name': 'favorite_books', 'type': {'type': 'array', 'items': {'type': 'record', 'name': 'user.profile.Book', 'fields': [{'name': 'name', 'type': 'string'}, {'name': 'publish_year', 'type': 'int'}, {'name': 'author', 'type': 'string'}]}}}]}, 'user.profile.Book': {'type': 'record', 'name': 'user.profile.Book', 'fields': [{'name': 'name', 'type': 'string'}, {'name': 'publish_year', 'type': 'int'}, {'name': 'author', 'type': 'string'}]}

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .config("spark.jars.packages", "com.databricks:spark-avro_2.11:4.0.0") \
                    .getOrCreate()

In [7]:
df = spark.read.format("com.databricks.spark.avro").load("users.avro")
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- favorite_books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- publish_year: integer (nullable = true)
 |    |    |-- author: string (nullable = true)



In [8]:
from pyspark.sql.functions import *
explode_col = explode('favorite_books').alias("favorite_book")
exploded_df = df.select(
    "name", 
    "age",
    explode_col
)

exploded_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- favorite_book: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- publish_year: integer (nullable = true)
 |    |-- author: string (nullable = true)



In [9]:
flatten_df = exploded_df.select(
    "name", 
    "age",
    col("favorite_book.name").alias("favorite_book_name"),
    col("favorite_book.publish_year").alias("favorite_book_publish_year"),
    col("favorite_book.author").alias("favorite_book_author"),
)

flatten_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- favorite_book_name: string (nullable = true)
 |-- favorite_book_publish_year: integer (nullable = true)
 |-- favorite_book_author: string (nullable = true)



In [10]:
flatten_df.show()

+----+---+------------------+--------------------------+--------------------+
|name|age|favorite_book_name|favorite_book_publish_year|favorite_book_author|
+----+---+------------------+--------------------------+--------------------+
| Ken| 17|      Avro Example|                      2020|                 Ben|
| Eva| 17|     Avro Explorer|                      2021|                 Jim|
| Eva| 17|   Data Extraction|                      2019|                 Ted|
| Ken| 17|      Avro Example|                      2020|                 Ben|
| Eva| 17|     Avro Explorer|                      2021|                 Jim|
| Eva| 17|   Data Extraction|                      2019|                 Ted|
| Ken| 17|      Avro Example|                      2020|                 Ben|
| Eva| 17|     Avro Explorer|                      2021|                 Jim|
| Eva| 17|   Data Extraction|                      2019|                 Ted|
| Ken| 17|      Avro Example|                      2020|        

In [11]:
flatten_df.agg(count("*").alias("row_count")).show()

+---------+
|row_count|
+---------+
|   300000|
+---------+



In [12]:
flatten_df.repartition(1).write \
    .format("com.databricks.spark.avro") \
    .save("flatten_users")

In [13]:
validate_df = spark.read.format("com.databricks.spark.avro").load("flatten_users")
validate_df.groupby("favorite_book_name").agg(count("*").alias("number_of_users")).show()

+------------------+---------------+
|favorite_book_name|number_of_users|
+------------------+---------------+
|   Data Extraction|         100000|
|     Avro Explorer|         100000|
|      Avro Example|         100000|
+------------------+---------------+

