# Silver Layer(2) - Data Transformation

In [1]:
## To display notebook cell with horizontal scroll bar
from IPython.display import display,HTML
display(HTML("<style>pre {white-space:pre !important;}</style> "))

In [290]:
from pyspark.sql.functions import lit
import findspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import StringType

In [291]:
findspark.init()

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

In [292]:
patents = spark.read.json('cleaned_patents.json')

In [293]:
patents.show(4)

+--------------------+--------------------+----------------+---------------------+------------------+-----------------------+------------------------+--------------+-------+-----------+----------------------+-----------------------+----------+--------------------+--------------------+-------------+----------+------+--------------------+
|       abstract_text|          applicants|application date|assignee_name_current|assignee_name_orig|backward_cite_no_family|backward_cite_yes_family|          code|country|filing_date|forward_cite_no_family|forward_cite_yes_family|grant_date|       inventor_name|                link|priority_date|  pub_date|source|               title|
+--------------------+--------------------+----------------+---------------------+------------------+-----------------------+------------------------+--------------+-------+-----------+----------------------+-----------------------+----------+--------------------+--------------------+-------------+----------+------+-----

In [294]:
patents.printSchema()

root
 |-- abstract_text: string (nullable = true)
 |-- applicants: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- application date: string (nullable = true)
 |-- assignee_name_current: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- assignee_name_orig: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- backward_cite_no_family: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- patent_number: string (nullable = true)
 |    |    |-- priority_date: string (nullable = true)
 |    |    |-- pub_date: string (nullable = true)
 |-- backward_cite_yes_family: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- patent_number: string (nullable = true)
 |    |    |-- priority_date: string (nullable = true)
 |    |    |-- pub_date: string (nullable = true)
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- filing_date

In [295]:
patents.show()


+--------------------+--------------------+----------------+---------------------+--------------------+-----------------------+------------------------+--------------+-------+-----------+----------------------+-----------------------+----------+--------------------------+--------------------+-------------+----------+-------------+--------------------+
|       abstract_text|          applicants|application date|assignee_name_current|  assignee_name_orig|backward_cite_no_family|backward_cite_yes_family|          code|country|filing_date|forward_cite_no_family|forward_cite_yes_family|grant_date|             inventor_name|                link|priority_date|  pub_date|       source|               title|
+--------------------+--------------------+----------------+---------------------+--------------------+-----------------------+------------------------+--------------+-------+-----------+----------------------+-----------------------+----------+--------------------------+--------------------

In [296]:
patents = patents.withColumnRenamed("application date", "application_date")

In [297]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType


In [298]:
# Define the UDF to handle nested arrays
def empty_nested_list_to_none(arr):
    if arr is None:
        return None
    elif len(arr) == 0:
        return None
    else:
        return [None] if all(len(inner_arr) == 0 for inner_arr in arr) else arr

In [299]:
# Define the schema for the nested array
inner_struct = StructType([
    StructField("patent_number", StringType(), nullable=True),
    StructField("priority_date", StringType(), nullable=True),
    StructField("pub_date", StringType(), nullable=True)
])

outer_array_schema = ArrayType(inner_struct)

nested_list_to_none = udf(empty_nested_list_to_none, outer_array_schema)

In [300]:
def empty_list_to_none(l):
    return None if len(l)==0 else l

In [301]:
list_to_none = udf(empty_list_to_none,ArrayType(StringType()))

In [302]:
def empty_string_to_none(s):
    return None if len(s)==0 else s

In [303]:
string_to_none = udf(empty_string_to_none,StringType())

In [304]:
columns = [ 
 'backward_cite_no_family',
 'backward_cite_yes_family',
 'forward_cite_no_family',
 'forward_cite_yes_family',
]

for column in columns:
    patents = patents.withColumn(column,nested_list_to_none(patents[column]))

In [305]:
columns = [ 
"assignee_name_current",
"assignee_name_orig",
"inventor_name",
"applicants"
]

for column in columns:
    patents = patents.withColumn(column,list_to_none(patents[column]))

In [306]:
columns = [ 
 'application_date',
 'grant_date',
 'priority_date',
 'pub_date',
]

for column in columns:
    patents = patents.withColumn(column,string_to_none(patents[column]))

In [307]:
patents.show()

+--------------------+--------------------+----------------+---------------------+--------------------+-----------------------+------------------------+--------------+-------+-----------+----------------------+-----------------------+----------+--------------------------+--------------------+-------------+----------+-------------+--------------------+
|       abstract_text|          applicants|application_date|assignee_name_current|  assignee_name_orig|backward_cite_no_family|backward_cite_yes_family|          code|country|filing_date|forward_cite_no_family|forward_cite_yes_family|grant_date|             inventor_name|                link|priority_date|  pub_date|       source|               title|
+--------------------+--------------------+----------------+---------------------+--------------------+-----------------------+------------------------+--------------+-------+-----------+----------------------+-----------------------+----------+--------------------------+--------------------

In [308]:
patents=patents.drop("filing_date")

In [309]:
def extract_country(patent_code):
    match = re.match(r'([A-Za-z]+)\d+', patent_code)
    if match:
        return match.group(1)
    else:
        return None

create_country = udf(extract_country,StringType())

patents = patents.withColumn("country",create_country(patents["code"]))

In [310]:
patents.show()

+--------------------+--------------------+----------------+---------------------+--------------------+-----------------------+------------------------+--------------+-------+----------------------+-----------------------+----------+--------------------------+--------------------+-------------+----------+-------------+--------------------+
|       abstract_text|          applicants|application_date|assignee_name_current|  assignee_name_orig|backward_cite_no_family|backward_cite_yes_family|          code|country|forward_cite_no_family|forward_cite_yes_family|grant_date|             inventor_name|                link|priority_date|  pub_date|       source|               title|
+--------------------+--------------------+----------------+---------------------+--------------------+-----------------------+------------------------+--------------+-------+----------------------+-----------------------+----------+--------------------------+--------------------+-------------+----------+----------

In [311]:
patents.printSchema()

root
 |-- abstract_text: string (nullable = true)
 |-- applicants: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- application_date: string (nullable = true)
 |-- assignee_name_current: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- assignee_name_orig: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- backward_cite_no_family: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- patent_number: string (nullable = true)
 |    |    |-- priority_date: string (nullable = true)
 |    |    |-- pub_date: string (nullable = true)
 |-- backward_cite_yes_family: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- patent_number: string (nullable = true)
 |    |    |-- priority_date: string (nullable = true)
 |    |    |-- pub_date: string (nullable = true)
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- forward_cit

In [None]:
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql.functions import udf

def None_list_unk(s):
    if s is None:
        return ["unk"]  
    return s
udf_none_list_unk=udf(None_list_unk,ArrayType(StringType()))

In [None]:
patents=patents.withColumn("assignee_name_current",udf_none_list_unk(patents_10["assignee_name_current"]))
patents=patents.withColumn("assignee_name_orig",udf_none_list_unk(patents_10["assignee_name_orig"]))

In [312]:
patents = patents.coalesce(1)

In [314]:
patents.write.json('transformed_data.json')

# Data WareHouse architecture : Star Schema 

#### DimPatent
- code_patent
- title 
- source
- link
- abstract

#### DimInventor
- id_title
- title

#### DimCountry
- id_country
- country

#### DimInventor
- id_inventor
- full_name

#### DimAssignee
- id_assignee
- assignee_name
- type


#### Keyword
- id_keyword
- keyword

#### DimTime
- id_time
- year
- month
- day

#### FactPublication
- id_inventor
- id_time
- code_patent


#### FactGrant
- id_assignee.
- id_inventor
- id_time
- code_patent

#### FactApplication
- id_applicant
- id_inventor
- id_time
- code_patent


#### FactPriority
- id_inventor
- id_time
- code_patent

#### FactKetword
- id_title
- id_invontor
- id_country
- id_time
- id_assignee
- id_keyword



### next step : build the data warehouse (data modeling)

#### Medallion architecture 

  ![schema](images/medallion_architecture.png)

### Star Schema 

  ![schema](images/star_schema.png)