#Project 3 Silver Layer

##Setup

###*Imports*

In [None]:
from pyspark.sql.functions import col, concat, concat_ws, lit
from pyspark.sql.window import *
from pyspark.sql.functions import row_number

###*Set Authentication Credentials*

In [None]:
client_id = dbutils.secrets.get(scope='team1-keyvault',key='client-id')
tenant_id = dbutils.secrets.get(scope='team1-keyvault',key='tenant-id')
service_credential = dbutils.secrets.get(scope='team1-keyvault', key='client-secret')

storage_acct_name = '20230821team1sa'
cont_name = 'project3-team1'
base_path = f'abfss://{cont_name}@{storage_acct_name}.dfs.core.windows.net'
bronze_path = f'{base_path}/BronzeLayer'
silver_path = f'{base_path}/SilverLayer'
ext_tables_path = f'{bronze_path}/ExternalTables'
main_tables_path = f'{bronze_path}/BronzePartitions'

In [None]:
spark.conf.set(f'fs.azure.account.auth.type.{storage_acct_name}.dfs.core.windows.net', 'OAuth')
spark.conf.set(f'fs.azure.account.oauth.provider.type.{storage_acct_name}.dfs.core.windows.net', 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')
spark.conf.set(f'fs.azure.account.oauth2.client.id.{storage_acct_name}.dfs.core.windows.net', client_id)
spark.conf.set(f'fs.azure.account.oauth2.client.secret.{storage_acct_name}.dfs.core.windows.net', service_credential)
spark.conf.set(f'fs.azure.account.oauth2.client.endpoint.{storage_acct_name}.dfs.core.windows.net', f'https://login.microsoftonline.com/{tenant_id}/oauth2/token')

###*Load Main Tables*

In [None]:
event_mentions_df = spark.read.parquet(f'{main_tables_path}/EventMentionPartition')
event_df = spark.read.parquet(f'{main_tables_path}/EventPartition')

###*Load External Tables*

In [None]:
countries_df = spark.read.parquet(f'{ext_tables_path}/Country.merged')
cameo_ethnic_df = spark.read.parquet(f'{ext_tables_path}/CAMEO.ethnic')
cameo_eventcodes_df = spark.read.parquet(f'{ext_tables_path}/CAMEO.eventcodes')
quad_class_df = spark.read.parquet(f'{ext_tables_path}/CAMEO.quadclass')
cameo_knowngroup_df = spark.read.parquet(f'{ext_tables_path}/CAMEO.knowngroup')
cameo_religion_df = spark.read.parquet(f'{ext_tables_path}/CAMEO.religion')
cameo_type_df = spark.read.parquet(f'{ext_tables_path}/CAMEO.type')

##Create Tables

###*Create Actor GEO Table*

In [None]:
actor_geo_df = event_df.select(
    'Actor1Geo_CountryCode', 'Actor1Geo_Lat', 'Actor1Geo_Long'
).withColumnsRenamed(
    {
        'Actor1Geo_CountryCode': 'ActorGeo_CountryCode',
        'Actor1Geo_Lat': 'ActorGeo_Lat',
        'Actor1Geo_Long': 'ActorGeo_Long',
    }
)

actor2_geo_df = event_df.select(
    'Actor2Geo_CountryCode', 'Actor2Geo_Lat', 'Actor2Geo_Long'
).withColumnsRenamed(
    {
        'Actor2Geo_CountryCode': 'ActorGeo_CountryCode',
        'Actor2Geo_Lat': 'ActorGeo_Lat',
        'Actor2Geo_Long': 'ActorGeo_Long',
    }
)

actor_geo_df = actor_geo_df.union(actor2_geo_df)

actor_geo_df = actor_geo_df.dropDuplicates()

In [None]:
Window = Window().orderBy(lit('A'))
actor_geo_df = actor_geo_df.withColumn(
    'ActorGeo_ID', row_number().over(Window)
)

In [None]:
condition1 = [event_df.Actor1Geo_CountryCode.eqNullSafe(actor_geo_df.ActorGeo_CountryCode) \
              & event_df.Actor1Geo_Lat.eqNullSafe(actor_geo_df.ActorGeo_Lat) \
              & event_df.Actor1Geo_Long.eqNullSafe(actor_geo_df.ActorGeo_Long)]

condition2 = [event_df.Actor2Geo_CountryCode.eqNullSafe(actor_geo_df.ActorGeo_CountryCode) \
              & event_df.Actor2Geo_Lat.eqNullSafe(actor_geo_df.ActorGeo_Lat) \
              & event_df.Actor2Geo_Long.eqNullSafe(actor_geo_df.ActorGeo_Long)]

event_df = event_df.join(actor_geo_df, on=condition1, how='left')
event_df = event_df.withColumnRenamed('ActorGeo_ID', 'Actor1Geo_ID')
event_df = event_df.drop('ActorGeo_CountryCode', 'ActorGeo_Lat', 'ActorGeo_Long')

event_df = event_df.join(actor_geo_df, on=condition2,  how='left')
event_df = event_df.withColumnRenamed('ActorGeo_ID', 'Actor2Geo_ID')
event_df = event_df.drop('ActorGeo_CountryCode', 'ActorGeo_Lat', 'ActorGeo_Long')

###*Create Actor Table*

####*Add actor IDs*

In [None]:
event_df = event_df.withColumn('Actor1ID', concat(col('Actor1Code'), col('Actor1Name')))
event_df = event_df.withColumn('Actor2ID', concat(col('Actor2Code'), col('Actor2Name')))

In [None]:
actor1 = event_df.select(
    'Actor1ID',
    'Actor1Name',
    'Actor1CountryCode',
    'Actor1KnownGroupCode',
    'Actor1EthnicCode',
    'Actor1Religion1Code',
    'Actor1Religion2Code',
    'Actor1Type1Code',
    'Actor1Type2Code',
    'Actor1Type3Code',
).where(col('Actor1ID').isNotNull())

In [None]:
actor1 = actor1.withColumnsRenamed(
    {
        'Actor1ID': 'Actor_ID',
        'Actor1Name': 'Actor_Name',
        'Actor1CountryCode': 'Actor_CountryCode',
        'Actor1KnownGroupCode': 'Actor_KnownGroupCode',
        'Actor1EthnicCode': 'Actor_EthnicCode',
        'Actor1Religion1Code': 'Actor_Religion1Code',
        'Actor1Religion2Code': 'Actor_Religion2Code',
        'Actor1Type1Code': 'Actor_Type1Code',
        'Actor1Type2Code': 'Actor_Type2Code',
        'Actor1Type3Code': 'Actor_Type3Code',
    }
)

In [None]:
actor2 = event_df.select(
    'Actor2ID',
    'Actor2Name',
    'Actor2CountryCode',
    'Actor2KnownGroupCode',
    'Actor2EthnicCode',
    'Actor2Religion1Code',
    'Actor2Religion2Code',
    'Actor2Type1Code',
    'Actor2Type2Code',
    'Actor2Type3Code',
).where(col('Actor2ID').isNotNull())

In [None]:
actor2 = actor2.withColumnsRenamed(
    {
        'Actor2ID': 'Actor_ID',
        'Actor2Name': 'Actor_Name',
        'Actor2CountryCode': 'Actor_CountryCode',
        'Actor2KnownGroupCode': 'Actor_KnownGroupCode',
        'Actor2EthnicCode': 'Actor_EthnicCode',
        'Actor2Religion1Code': 'Actor_Religion1Code',
        'Actor2Religion2Code': 'Actor_Religion2Code',
        'Actor2Type1Code': 'Actor_Type1Code',
        'Actor2Type2Code': 'Actor_Type2Code',
        'Actor2Type3Code': 'Actor_Type3Code',
    }
)

In [None]:
actor_df = actor1.union(actor2)
actor_df = actor_df.dropDuplicates()

####*Combine external actor code tables*

#####*Combine ActorKnownGroup*

In [None]:
actor_df = (
    actor_df.join(
        cameo_knowngroup_df,
        on=[actor_df.Actor_KnownGroupCode == cameo_knowngroup_df.code],
        how='left',
    )
    .drop('code', 'Actor_KnownGroupCode')
    .withColumnRenamed('label', 'Actor_KnownGroup')
)

#####*Combine ActorEthnicCode*

In [None]:
actor_df = actor_df.join(
    cameo_ethnic_df, on=[actor_df.Actor_EthnicCode == cameo_ethnic_df.code], how='left'
)
actor_df = actor_df.drop('Actor_EthnicCode', 'code')
actor_df = actor_df.withColumnRenamed('label', 'Actor_Ethnic')

#####*Combine ActorType*

In [None]:
actor_df = actor_df.join(
    cameo_type_df, on=[actor_df.Actor_Type1Code == cameo_type_df.code], how='left'
)
actor_df = actor_df.drop('Actor_Type1Code', 'code')
actor_df = actor_df.withColumnRenamed('label', 'Actor_Type1')

actor_df = actor_df.join(
    cameo_type_df, on=[actor_df.Actor_Type2Code == cameo_type_df.code], how='left'
)
actor_df = actor_df.drop('Actor_Type2Code', 'code')
actor_df = actor_df.withColumnRenamed('label', 'Actor_Type2')

actor_df = actor_df.join(
    cameo_type_df, on=[actor_df.Actor_Type3Code == cameo_type_df.code], how='left'
)
actor_df = actor_df.drop('Actor_Type3Code', 'code')
actor_df = actor_df.withColumnRenamed('label', 'Actor_Type3')

#####*Combine ActorReligion*

In [None]:
actor_df = actor_df.join(
    cameo_religion_df,
    on=[actor_df.Actor_Religion1Code == cameo_religion_df.code],
    how='left',
)
actor_df = actor_df.drop('Actor_Religion1Code', 'code')
actor_df = actor_df.withColumnRenamed('label', 'Actor_Religion1')

In [None]:
actor_df = actor_df.join(
    cameo_religion_df,
    on=[actor_df.Actor_Religion2Code == cameo_religion_df.code],
    how='left',
)
actor_df = actor_df.drop('Actor_Religion2Code', 'code')
actor_df = actor_df.withColumnRenamed('label', 'Actor_Religion2')

###*Create Event Mention Table*

In [None]:
event_mentions_df = event_mentions_df.drop(
    'Extras',
    'Confidence',
    'InRawText',
    'ActionCharOffset',
    'SentenceID',
    'EventTimeDate',
    'MentionDocTranslationInfo',
)
window = Window.orderBy(lit('A'))
event_mentions_df = event_mentions_df.withColumn(
    'EventMentionID', row_number().over(window)
)

###*Create Event Table*

In [None]:
event_df = event_df.select(
    'GLOBALEVENTID',
    'SQLDATE',
    concat(col('Actor1Code'), col('Actor1Name')).alias('Actor1ID'),
    'Actor1Geo_ID',
    concat(col('Actor2Code'), col('Actor2Name')).alias('Actor2ID'),
    'Actor2Geo_ID',
    'EventCode',
    'QuadClass',
    'GoldsteinScale',
    'AvgTone',
    'SOURCEURL',
)

##Write Data

In [None]:
event_df.repartition(10).write.mode('overwrite').parquet(f'{silver_path}/EVENT')
event_mentions_df.repartition(24).write.mode('overwrite').parquet(f'{silver_path}/EVENT_MENTIONS')
actor_df.write.mode('overwrite').parquet(f'{silver_path}/ACTOR')
actor_geo_df.write.mode('overwrite').parquet(f'{silver_path}/ACTOR_GEO')
quad_class_df.write.mode('overwrite').parquet(f'{silver_path}/QUAD_CLASS')
countries_df.write.mode('overwrite').parquet(f'{silver_path}/COUNTRIES')
cameo_eventcodes_df.write.mode('overwrite').parquet(f'{silver_path}/CAMEO_EVENTCODES')