In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
import re
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
from pyspark.sql import functions
from pyspark.sql import types

import findspark
findspark.init()

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.functions import min

from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext



In [29]:
import re

import os
import shutil

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import unix_timestamp, udf, to_date, to_timestamp
from pyspark.sql.types import *
from datetime import datetime

EVENTS_SCHEMA = StructType([
    StructField("GLOBALEVENTID", LongType(), True),
    StructField("Day_DATE", StringType(), True),
    StructField("MonthYear_Date", StringType(), True),
    StructField("Year_Date", StringType(), True),
    StructField("FractionDate", FloatType(), True),
    StructField("Actor1Code", StringType(), True),
    StructField("Actor1Name", StringType(), True),
    StructField("Actor1CountryCode", StringType(), True),
    StructField("Actor1KnownGroupCode", StringType(), True),
    StructField("Actor1EthnicCode", StringType(), True),
    StructField("Actor1Religion1Code", StringType(), True),
    StructField("Actor1Religion2Code", StringType(), True),
    StructField("Actor1Type1Code", StringType(), True),
    StructField("Actor1Type2Code", StringType(), True),
    StructField("Actor1Type3Code", StringType(), True),
    StructField("Actor2Code", StringType(), True),
    StructField("Actor2Name", StringType(), True),
    StructField("Actor2CountryCode", StringType(), True),
    StructField("Actor2KnownGroupCode", StringType(), True),
    StructField("Actor2EthnicCode", StringType(), True),
    StructField("Actor2Religion1Code", StringType(), True),
    StructField("Actor2Religion2Code", StringType(), True),
    StructField("Actor2Type1Code", StringType(), True),
    StructField("Actor2Type2Code", StringType(), True),
    StructField("Actor2Type3Code", StringType(), True),
    StructField("IsRootEvent", LongType(), True),
    StructField("EventCode", StringType(), True),
    StructField("EventBaseCode", StringType(), True),
    StructField("EventRootCode", StringType(), True),
    StructField("QuadClass", LongType(), True),
    StructField("GoldsteinScale", FloatType(), True),
    StructField("NumMentions", LongType(), True),
    StructField("NumSources", LongType(), True),
    StructField("NumArticles", LongType(), True),
    StructField("AvgTone", FloatType(), True),
    StructField("Actor1Geo_Type", LongType(), True),
    StructField("Actor1Geo_FullName", StringType(), True),
    StructField("Actor1Geo_CountryCode", StringType(), True),
    StructField("Actor1Geo_ADM1Code", StringType(), True),
    StructField("Actor1Geo_ADM2Code", StringType(), True),
    StructField("Actor1Geo_Lat", FloatType(), True),
    StructField("Actor1Geo_Long", FloatType(), True),
    StructField("Actor1Geo_FeatureID", StringType(), True),
    StructField("Actor2Geo_Type", LongType(), True),
    StructField("Actor2Geo_FullName", StringType(), True),
    StructField("Actor2Geo_CountryCode", StringType(), True),
    StructField("Actor2Geo_ADM1Code", StringType(), True),
    StructField("Actor2Geo_ADM2Code", StringType(), True),
    StructField("Actor2Geo_Lat", FloatType(), True),
    StructField("Actor2Geo_Long", FloatType(), True),
    StructField("Actor2Geo_FeatureID", StringType(), True),
    StructField("ActionGeo_Type", LongType(), True),
    StructField("ActionGeo_FullName", StringType(), True),
    StructField("ActionGeo_CountryCode", StringType(), True),
    StructField("ActionGeo_ADM1Code", StringType(), True),
    StructField("ActionGeo_ADM2Code", StringType(), True),
    StructField("ActionGeo_Lat", FloatType(), True),
    StructField("ActionGeo_Long", FloatType(), True),
    StructField("ActionGeo_FeatureID", StringType(), True),
    StructField("DATEADDED", LongType(), True),
    StructField("SOURCEURL", StringType(), True)
])


MENTIONS_SCHEMA = StructType([
    StructField("GLOBALEVENTID", LongType(), True),
    StructField("EventTimeDate", LongType(), True),
    StructField("MentionTimeDate", LongType(), True),
    StructField("MentionType", LongType(), True),
    StructField("MentionSourceName", StringType(), True),
    StructField("MentionIdentifier", StringType(), True),
    StructField("SentenceID", LongType(), True),
    StructField("Actor1CharOffset", LongType(), True),
    StructField("Actor2CharOffset", LongType(), True),
    StructField("ActionCharOffset", LongType(), True),
    StructField("InRawText", LongType(), True),
    StructField("Confidence", LongType(), True),
    StructField("MentionDocLen", LongType(), True),
    StructField("MentionDocTone", FloatType(), True),
    StructField("MentionDocTranslationInfo", StringType(), True),
    StructField("Extras", StringType(), True)
])

OUT_DIR = 'output'
DATA_DIR = '.'


def save(df):
    df.repartition(1).write.mode('overwrite').csv('df_save')


spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.session.timeZone', 'UTC')
sc = spark.sparkContext

events_raw = spark.read.csv(os.path.join(DATA_DIR, "*.export.CSV"), sep="\t", schema=EVENTS_SCHEMA)
mentions_raw = spark.read.csv(os.path.join(DATA_DIR, "*.mentions.CSV"), sep="\t", schema=MENTIONS_SCHEMA)



In [42]:
events = events_raw.select('GLOBALEVENTID',
                           to_date(events_raw.Day_DATE.cast('String'), 'yyyyMMdd').alias('date'),
                           'MonthYear_Date',
                           'Year_Date',
                           'FractionDate',
                           'QuadClass',
                           'GoldsteinScale',
                           'AvgTone',
                          'ActionGeo_CountryCode')

events = events.select('GLOBALEVENTID',
                           'date',
                           dayofmonth(events.date).alias('Day_Date'),
                           month(events.date).alias('Month_Date'),
                           'Year_Date',
                           'FractionDate',
                           'Actor1CountryCode',
                           'Actor2CountryCode',
                           'QuadClass',
                           'GoldsteinScale',
                           'AvgTone',
                           'Actor1Geo_CountryCode',
                           'Actor2Geo_CountryCode',
                          'ActionGeo_CountryCode')


mentions = mentions_raw.select('GLOBALEVENTID',
                               to_timestamp(mentions_raw.EventTimeDate.cast('String'), 'yyyyMMddHHmmss').alias(
                                   'EventTimeDate'), 
                               to_timestamp(mentions_raw.MentionTimeDate.cast('String'), 'yyyyMMddHHmmss').alias(
                                   'MentionTimeDate'),
                               'MentionType',
                               'Confidence')

events.registerTempTable('events')
mentions.registerTempTable('mentions')

events.printSchema()


root
 |-- GLOBALEVENTID: long (nullable = true)
 |-- date: date (nullable = true)
 |-- Day_Date: integer (nullable = true)
 |-- Month_Date: integer (nullable = true)
 |-- Year_Date: string (nullable = true)
 |-- FractionDate: float (nullable = true)
 |-- Actor1CountryCode: string (nullable = true)
 |-- Actor2CountryCode: string (nullable = true)
 |-- QuadClass: long (nullable = true)
 |-- GoldsteinScale: float (nullable = true)
 |-- AvgTone: float (nullable = true)
 |-- Actor1Geo_CountryCode: string (nullable = true)
 |-- Actor2Geo_CountryCode: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)



Unnamed: 0,GLOBALEVENTID,date,Day_Date,Month_Date,Year_Date,FractionDate,Actor1CountryCode,Actor2CountryCode,QuadClass,GoldsteinScale,AvgTone,Actor1Geo_CountryCode,Actor2Geo_CountryCode,ActionGeo_CountryCode
0,410479387,2014-02-19,19,2,2014,2014.134155,,AUS,1,2.8,2.194357,,AS,AS
1,410479388,2014-02-19,19,2,2014,2014.134155,,,1,1.9,-2.571166,,US,US
2,410479389,2014-02-19,19,2,2014,2014.134155,,,1,1.9,-6.820567,,BR,BR
3,410479390,2014-02-19,19,2,2014,2014.134155,AUS,,1,1.9,2.194357,AS,,AS
4,410479391,2014-02-19,19,2,2014,2014.134155,BRA,,1,3.0,-6.820567,BR,,BR


In [20]:
loc_events = events.dropna(subset='ActionGeo_CountryCode')
mentions_count = mentions.groupBy('GLOBALEVENTID').count()
country_count = mentions_count.join(loc_events, 'GLOBALEVENTID').groupBy('ActionGeo_CountryCode').sum('count').orderBy('sum(count)')

country_count.toPandas().head(100)

# events_raw.filter(events_raw['ActionGeo_CountryCode'] == 'CH').toPandas().head()

Unnamed: 0,ActionGeo_CountryCode,sum(count)
0,RO,1
1,SL,1
2,NL,1
3,BX,1
4,BC,1
5,GB,1
6,MO,1
7,AE,1
8,HA,1
9,KU,1


In [17]:
fips_iso = sc.broadcast(dict([('AG', 'DZ'),
                 ('AQ', 'AS'),
                 ('AN', 'AD'),
                 ('AV', 'AI'),
                 ('AY', 'AQ'),
                ('AC', 'AG'),
                ('AA', 'AW'),
                ('AS', 'AU'),
                ('AU', 'AT'),
                ('AJ', 'AZ'),
                ('BF', 'BS'),
                ('BA', 'BH'),
                ('BG', 'BD'),
                ('BO', 'BY'),
                ('BH', 'BZ'),
                ('BN', 'BJ'),
                ('BD', 'BM'),
                ('BL', 'BO'),
                ('BK', 'BA'),
                ('BC', 'BW'),
                ('BX', 'BN'),
                ('BU', 'BG'),
                ('UV', 'BF'),
                ('BY', 'BI'),
                ('CB', 'KH'),
                ('CJ', 'KY'),
                ('CT', 'CF'),
                ('CD', 'TD'),
                ('CI', 'CL'),
                ('CH', 'CN'),
                ('KT', 'CX'),
                ('CK', 'CC'),
                ('CN', 'KM'),
                ('CF', 'CG'),
                ('CG', 'CD'),
                ('CW', 'CK'),
                ('CS', 'CR'),
                ('IV', 'CI'),
                ('UC', 'CW'),
                ('EZ', 'CZ'),
                ('DO', 'DM'),
                ('DR', 'DO'),
                ('ES', 'SV'),
                ('EK', 'GQ'),
                ('EN', 'EE'),
                ('FP', 'PF'),
                ('FS', 'TF'),
                ('GB', 'GA'),
                ('GA', 'GM'),
                ('GJ', 'GD'),
                ('GQ', 'GU'),
                ('GK', 'GG'),
                ('GV', 'GN'),
                ('PU', 'GW'),
                ('HA', 'HT'),
                ('VT', 'VA'),
                ('HO', 'HN'),
                ('IC', 'IS'),
                ('IZ', 'IQ'),
                ('IS', 'IL'),
                ('JA', 'JP'),
                ('KR', 'KI'),
                ('KN', 'KP'),
                ('KS', 'KR'),
                ('KU', 'KW'),
                ('LG', 'LV'),
                ('LE', 'LB'),
                ('LT', 'LS'),
                ('LI', 'LR'),
                ('LS', 'LI'),
                ('LH', 'LT'),
                ('MC', 'MO'),
                ('MA', 'MG'),
                ('MI', 'MW'),
                ('RM', 'MH'),
                ('MB', 'MQ'),
                ('MP', 'MU'),
                ('MF', 'YT'),
                ('MN', 'MC'),
                ('MG', 'MN'),
                ('MJ', 'ME'),
                ('MH', 'MS'),
                ('MO', 'MA'),
                ('BM', 'MM'),
                ('WA', 'NA'),
                ('NU', 'NI'),
                ('NG', 'NE'),
                ('NI', 'NG'),
                ('NE', 'NU'),
                ('CQ', 'MP'),
                ('MU', 'OM'),
                ('PS', 'PW'),
                ('WE', 'PS'),
                ('PM', 'PA'),
                ('PP', 'PG'),
                ('PA', 'PY'),
                ('RP', 'PH'),
                ('PC', 'PN'),
                ('PO', 'PT'),
                ('RQ', 'PR'),
                ('RS', 'RU'),
                ('TB', 'BL'),
                ('SC', 'KN'),
                ('ST', 'LC'),
                ('RN', 'MF'),
                ('SB', 'PM'),
                ('TP', 'ST'),
                ('SG', 'SN'),
                ('RI', 'RS'),
                ('SE', 'SC'),
                ('SN', 'SG'),
                ('NN', 'SX'),
                ('BP', 'SB'),
                ('SF', 'ZA'),
                ('SX', 'GS'),
                ('OD', 'SS'),
                ('SP', 'ES'),
                ('CE', 'LK'),
                ('SU', 'SD'),
                ('NS', 'SR'),
                ('SV', 'SJ'),
                ('WZ', 'SZ'),
                ('SW', 'SE'),
                ('SZ', 'CH'),
                ('TI', 'TJ'),
                ('TT', 'TL'),
                ('TO', 'TG'),
                ('TL', 'TK'),
                ('TN', 'TO'),
                ('TD', 'TT'),
                ('TS', 'TN'),
                ('TU', 'TR'),
                ('TX', 'TM'),
                ('TK', 'TC'),
                ('UK', 'GB'),
                ('NH', 'VU'),
                ('VI', 'VG'),
                ('VQ', 'VI'),
                ('WI', 'EH'),
                ('YM', 'YE'),
                ('ZA', 'ZM'),
                ('ZI', 'ZW')]))

def fips2iso(fips):
    if fips in fips_iso.keys():
        return fips_iso.values[fips]
    else:
        return fips
    
isoCodes = functions.udf(fips2iso, types.StringType())

foo = events.withColumn('coutryCode_iso2', isoCodes(functions.col('ActionGeo_CountryCode')))
fips_iso

<pyspark.broadcast.Broadcast at 0x157a687fa20>