# 02. Data transformation

In previous notebook, we have seen how to download tweets. Note we only store the origin response, we have not do any cleaning and transformation.

If the data ingestion procesus is done by different people by using different tools and or api version. The tweets may have different schema.

If you want to use these tweets, we need to clean them and transform them into a unique schema that we can use them after.

In this notebook, we will show you how to do that.



Raw data sources:
- s3a://pengfei/diffusion/demo_prod/current/2021
- s3a://pengfei/diffusion/demo_prod/current_short/2021
- s3a://pengfei/diffusion/demo_prod/old/2010_2021

In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType
from pyspark.sql.functions import lit, col, when, concat, udf
import os

In [2]:
local=False
if local:
    spark=SparkSession.builder.master("local[4]") \
                  .config('spark.jars.packages', 'org.postgresql:postgresql:42.2.24') \
                  .appName("Twitter_Data_Transformation").getOrCreate()
else:
    spark=SparkSession.builder \
                      .master("k8s://https://kubernetes.default.svc:443") \
                      .appName("Twitter_Data_Transformation") \
                      .config("spark.kubernetes.container.image",os.environ["IMAGE_NAME"]) \
                      .config("spark.kubernetes.authenticate.driver.serviceAccountName",os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
                      .config("spark.kubernetes.driver.pod.name", os.environ['KUBERNETES_POD_NAME'])\
                      .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
                      .config("spark.executor.instances", "4") \
                      .config("spark.executor.memory","8g") \
                      .config('spark.jars.packages','org.postgresql:postgresql:42.2.24') \
                      .getOrCreate()
# spark.kubernetes.driver.pod.name config will enable executor Pod Garbage Collection



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2383434f-8839-4189-a65e-4ed31b6ac10a;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.24 in central
	found org.checkerframework#checker-qual;3.5.0 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.24/postgresql-42.2.24.jar ...
	[SUCCESSFUL ] org.postgresql#postgresql;42.2.24!postgresql.jar (158ms)
downloading https://repo1.maven.org/maven2/org/checkerframework/checker-qual/3.5.0/checker-qual-3.5.0.jar ...
	[SUCCESSFUL ] org.checkerframework#checker-qual;3.5.0!checker-qual.jar (42ms)
:: resolution report :: resolve 2101ms :: artifacts dl 202ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.2.24 from central in [default]
	------------------------------

## 2.1 Explorer different data set

In [None]:
path1 = "s3a://pengfei/diffusion/demo_prod/current/2021"
path2 = "s3a://pengfei/diffusion/demo_prod/current_short/2021"
path3 = "s3a://pengfei/diffusion/demo_prod/old/2010_2021"

### 2.1.1 Check data of path1

In [6]:
df1=spark.read.parquet(path1)

2021-12-05 11:40:54,632 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [8]:
df1.count()

                                                                                

13158

In [17]:
df1.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- monetizable: boolean (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- i

### 2.1.2 Check data of path2

In [10]:
df2=spark.read.parquet(path2)

In [11]:
df2.count()

700

In [13]:
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- __index_level_0__: long (nullable = true)



### 2.1.3 Check data of path3

In [14]:
df3=spark.read.parquet(path3)

In [15]:
df3.count()

                                                                                

2062

In [None]:
df3.printSchema()

**You can notice the three data set all have different schema.**

## 2.2 Merge the three data set

In [1]:
df1_tr=df1.select(df1.user.id_str,df1.text,df1.created_at).show(2,truncate=False)

NameError: name 'df1' is not defined

In [None]:
df2_tr=df2.select(col("name"),col("text"),col("date"))

In [None]:
df3_tr=df3.select(df3.user.name,df3.text,df3.created_at)

In [None]:
union_1=df1_tr.union(df2_tr).union(df3_tr)