In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
        .builder \
        .master('local[*]') \
        .getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
15,application_1646890912134_0017,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Preprocessing: re-save `war_ukr_rus_w_internal.parquet` to include URLs

In [2]:
path = "s3://bdcc-final-project-bucket/war_ukr_rus_w_internal.parquet/"
data = spark.read.parquet(path)
data.createOrReplaceTempView('data')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
transactions1 = spark.sql("""
    select MonthYear,
           Actor1Name,
           avg(AvgTone) as AvgTone,
           sum(NumMentions) as NumMentions,
           avg(GoldSteinScale) as GoldSteinScale,
           collect_set(
                       concat(
                              Actor1Name,'_1', ',',
                              Actor2Name, '_2', ',',
                              Actor2CountryCode, '-',
                              EVENTDESCRIPTION
                              )
                       ) as itemset,
            row_number() over (order by (select 1)) as trans_id,
            SOURCEURL
        from data
        group by MonthYear, Actor1Name, SOURCEURL
        order by MonthYear asc, Actor1Name asc
        """)

transactions2 = transactions1.withColumn(
    'itemset', array_distinct(split(transactions1['itemset'][0], '-')))

transactions2.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-----------------+-----------+--------------+--------------------+--------+--------------------+
|MonthYear|Actor1Name|          AvgTone|NumMentions|GoldSteinScale|             itemset|trans_id|           SOURCEURL|
+---------+----------+-----------------+-----------+--------------+--------------------+--------+--------------------+
|   201301|     CHINA| 2.36724248240563|        5.0|           0.0|[CHINA_1,RUSSIA_2...|  299465|http://www.newyor...|
|   201301|    MOSCOW|0.969088640789692|       31.0|          -5.0|[MOSCOW_1,WALL ST...|  512305|http://www.global...|
+---------+----------+-----------------+-----------+--------------+--------------------+--------+--------------------+
only showing top 2 rows

In [4]:
transactions2.write.parquet(
    's3://bdcc-final-project-bucket/itemsets_w_urls-2.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
trans_db = spark.sql(
    """
    SELECT
        Year,
        SQLDATE,
        Actor1Name,
        COLLECT_SET(CONCAT(
            Actor1Name, "_1", ", ",
            Actor2Name, "_2", ", ",
            Actor2CountryCode, "-",
            EVENTDESCRIPTION
        )) AS itemset,
        SOURCEURL
    FROM data
    GROUP BY Year, SQLDATE, Actor1Name, SOURCEURL
    """
)

trans_db.limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+---------------+--------------------+--------------------+
|Year| SQLDATE|     Actor1Name|             itemset|           SOURCEURL|
+----+--------+---------------+--------------------+--------------------+
|2013|20130110|   NORTH DAKOTA|[NORTH DAKOTA_1, ...|http://www.herald...|
|2013|20130304|        UKRAINE|[UKRAINE_1, WASHI...|http://www.csmoni...|
|2013|20130304| UNITED KINGDOM|[UNITED KINGDOM_1...|http://www.newrep...|
|2013|20130402|         RUSSIA|[RUSSIA_1, CHINA_...|http://www.thenew...|
|2013|20130402|THE WHITE HOUSE|[THE WHITE HOUSE_...|http://thehill.co...|
+----+--------+---------------+--------------------+--------------------+

In [34]:
trans_db.write.parquet('s3://bdcc-final-project-bucket/itemsets_w_urls-3.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Retrieve URLs in a Spark dataframe

In [48]:
def df_all_urls(antecedent, consequent, items_to_show='all', truncate=False):
    """Retrieves a list of `SOURCEURL`s from an inputted
    antecendent term and a consequent term.
    
    Parameters
    -----
    input_df: dataframe
        A dataframe with at least two columns containing (1) an array of
        itemsets in the column `itemset`, and (2) the source URLs in the
        column `SOURCEURL`.
    antecedent: string
        Antecedent string.
    consequent: string
        Consequent string.
    items_to_show: string
        How many items to show.
    truncate: boolean
        If a truncated dataframe is desired.
    
    Output
    -----
    Spark dataframe
        Containing all `SOURCEURL`s
    """
    input_df_path = 's3://bdcc-final-project-bucket/itemsets_w_urls-3.parquet'
    input_df = spark.read.parquet(input_df_path)
    input_df = input_df.withColumn('itemset', concat_ws(', ', 'itemset'))
    search_term = antecedent + ', ' + consequent
    if items_to_show == 'all':
        return input_df.filter(col(
               'itemset').like(search_term)).show(truncate=truncate)
    else:
        return input_df.filter(col(
               'itemset').like(search_term)).show(items_to_show,
                                                  truncate=truncate)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
# Sample
df_all_urls('%RUSSIA%Use conventional military force, not specifie%',
            '%RUS%', items_to_show=5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|Year|SQLDATE |Actor1Name |itemset                                                                                                                                                                                                                            |SOURCEURL                                                                                                             |
+----+--------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Retrieve URLs in a csv.gz file in S3 bucket

In [57]:
def csv_all_urls(antecedent, consequent, path):
    """Retrieves a list of `SOURCEURL`s from an inputted
    antecendent term and a consequent term.
    
    Parameters
    -----
    input_df: dataframe
        A dataframe with at least two columns containing (1) an array of
        itemsets in the column `itemset`, and (2) the source URLs in the
        column `SOURCEURL`.
    antecedent: string
        Antecedent string.
    consequent: string
        Consequent string.
    path: string
        Path to s3 bucket.
    
    Output
    -----
    csv.gz file
        Containing all `SOURCEURL`s
    """
    input_df_path = 's3://bdcc-final-project-bucket/itemsets_w_urls-3.parquet'
    input_df = spark.read.parquet(input_df_path)
    input_df = input_df.withColumn('itemset', concat_ws(', ', 'itemset'))
    search_term = antecedent + ', ' + consequent
    output_df = input_df.filter(col('itemset').like(search_term))
    return output_df.repartition(1).write.option(
                    'header', True).option('compression', 'gzip').csv(path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
# Sample
csv_all_urls('%RUSSIA%Use conventional military force, not specifie%',
             '%RUS%', 's3://bdcc-final-project-bucket/russia-conv-2')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…