In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').appName('pyspark_fresh').getOrCreate()

In [8]:
import datetime
data= [{
    "id":1,
    "firstName":"Terry",
    "lastName":"Medhurst",
    "maidenName":"Smitham",
    "age":50,
    "gender":"male",
    "email":"atuny0@sohu.com",
    "phone":{"phone_number":"+63 791 675 8914","mobile_number":""},
    "is_customer" : True,
    "amount_paid" : 10200,
    "customer_from" : datetime.date(2022,3,10),
    "last_update_ts" : datetime.datetime(2023,10,20, 15,0,45)
},
{
    "id":2,
    "firstName":"Sheldon",
    "lastName":"Quigley",
    "maidenName":"Cole",
    "age":28,"gender":"male",
    "email":"hbingley1@plala.or.jp",
    "phone":{"phone_number":"+7 813 117","mobile_number":None},
    "is_customer" : True,
    "amount_paid" : 11000,
    "customer_from" : datetime.date(2022,5,16),
    "last_update_ts" : datetime.datetime(2023,10,20, 15,0,45)
},
{
    "id":3,
    "firstName":"Terrill",
    "lastName":"Hills",
    "maidenName":"Hoeger",
    "age":38,
    "gender":"male",
    "email":"rshawe2@51.la",
    "phone":{"phone_number":"+63 739 292 7942","mobile_number":"+63 739 292 7943"},
    "is_customer" : True,
    "amount_paid" : 24000,
    "customer_from" : datetime.date(2022,4,11),
    "last_update_ts" : datetime.datetime(2023,10,20, 15,0,45)
},
{
    "id":4,
    "firstName":"Miles",
    "lastName":"Cummerata",
    "maidenName":"Maggio",
    "age":49,
    "gender":"male",
    "email":"yraigatt3@nature.com",
    "phone":{"phone_number":"+86 461 145 4186","mobile_number":"+86 461 145 4188"},
    "is_customer" : True,
    "amount_paid" : 10500,
    "customer_from" : datetime.date(2022,8,21),
    "last_update_ts" : datetime.datetime(2023,10,20, 15,0,45)
},
{
    "id":5,
    "firstName":"Mavis",
    "lastName":"Schultz",
    "maidenName":"Yundt",
    "age":38,
    "gender":"male",
    "email":"kmeus4@upenn.edu",
    "phone":{"phone_number":"+372 285 771 1911","mobile_number":"+372 285 771 1912"},
    "is_customer" : True,
    "amount_paid" : 10000,
    "customer_from" : datetime.date(2022,5,1),
    "last_update_ts" : datetime.datetime(2023,10,20, 15,0,45)
}]

In [9]:
from pyspark.sql import Row

In [10]:
user_rows = [Row(**user) for user in data]

In [11]:
spark.createDataFrame(user_rows).show()

+---+---------+---------+----------+---+------+--------------------+--------------------+-----------+-----------+-------------+-------------------+
| id|firstName| lastName|maidenName|age|gender|               email|               phone|is_customer|amount_paid|customer_from|     last_update_ts|
+---+---------+---------+----------+---+------+--------------------+--------------------+-----------+-----------+-------------+-------------------+
|  1|    Terry| Medhurst|   Smitham| 50|  male|     atuny0@sohu.com|{mobile_number ->...|       true|      10200|   2022-03-10|2023-10-20 15:00:45|
|  2|  Sheldon|  Quigley|      Cole| 28|  male|hbingley1@plala.o...|{mobile_number ->...|       true|      11000|   2022-05-16|2023-10-20 15:00:45|
|  3|  Terrill|    Hills|    Hoeger| 38|  male|       rshawe2@51.la|{mobile_number ->...|       true|      24000|   2022-04-11|2023-10-20 15:00:45|
|  4|    Miles|Cummerata|    Maggio| 49|  male|yraigatt3@nature.com|{mobile_number ->...|       true|      10500

In [12]:
df = spark.createDataFrame(user_rows)

In [7]:
df.dtypes

[('id', 'bigint'),
 ('firstName', 'string'),
 ('lastName', 'string'),
 ('maidenName', 'string'),
 ('age', 'bigint'),
 ('gender', 'string'),
 ('email', 'string'),
 ('phone', 'map<string,string>'),
 ('is_customer', 'boolean'),
 ('amount_paid', 'bigint'),
 ('customer_from', 'date'),
 ('last_update_ts', 'timestamp')]

In [13]:
from pyspark.sql.functions import col

In [16]:
df.select('id',col('phone')['phone_number'].alias('Phone'),col('phone')['mobile_number'].alias('Mobile')).show()

+---+-----------------+-----------------+
| id|            Phone|           Mobile|
+---+-----------------+-----------------+
|  1| +63 791 675 8914|                 |
|  2|       +7 813 117|             null|
|  3| +63 739 292 7942| +63 739 292 7943|
|  4| +86 461 145 4186| +86 461 145 4188|
|  5|+372 285 771 1911|+372 285 771 1912|
+---+-----------------+-----------------+



In [19]:
from pyspark.sql.functions import explode

In [24]:
df.select('id','firstName',explode('phone')).withColumnRenamed('key','phone_type').\
withColumnRenamed('value','phone_number').drop('phone').show()

+---+---------+-------------+-----------------+
| id|firstName|   phone_type|     phone_number|
+---+---------+-------------+-----------------+
|  1|    Terry|mobile_number|                 |
|  1|    Terry| phone_number| +63 791 675 8914|
|  2|  Sheldon|mobile_number|             null|
|  2|  Sheldon| phone_number|       +7 813 117|
|  3|  Terrill|mobile_number| +63 739 292 7943|
|  3|  Terrill| phone_number| +63 739 292 7942|
|  4|    Miles|mobile_number| +86 461 145 4188|
|  4|    Miles| phone_number| +86 461 145 4186|
|  5|    Mavis|mobile_number|+372 285 771 1912|
|  5|    Mavis| phone_number|+372 285 771 1911|
+---+---------+-------------+-----------------+



In [25]:
from pyspark.sql.functions import explode_outer

In [26]:
df.select('id','firstName',explode_outer('phone')).withColumnRenamed('key','phone_type').\
withColumnRenamed('value','phone_number').drop('phone').show()

+---+---------+-------------+-----------------+
| id|firstName|   phone_type|     phone_number|
+---+---------+-------------+-----------------+
|  1|    Terry|mobile_number|                 |
|  1|    Terry| phone_number| +63 791 675 8914|
|  2|  Sheldon|mobile_number|             null|
|  2|  Sheldon| phone_number|       +7 813 117|
|  3|  Terrill|mobile_number| +63 739 292 7943|
|  3|  Terrill| phone_number| +63 739 292 7942|
|  4|    Miles|mobile_number| +86 461 145 4188|
|  4|    Miles| phone_number| +86 461 145 4186|
|  5|    Mavis|mobile_number|+372 285 771 1912|
|  5|    Mavis| phone_number|+372 285 771 1911|
+---+---------+-------------+-----------------+

