In [3]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import scipy as sp

from matplotlib.pyplot import figure

from pyspark.sql.types import DateType
from datetime import datetime

spark = SparkSession.builder.getOrCreate()

In [82]:
EVENTS_SCHEMA = StructType([
    StructField("GLOBALEVENTID_event",LongType(),True),
    StructField("Day_DATE",StringType(),True),
    StructField("MonthYear_Date",StringType(),True),
    StructField("Year_Date",StringType(),True),
    StructField("FractionDate",FloatType(),True),
    StructField("Actor1Code",StringType(),True),
    StructField("Actor1Name",StringType(),True),
    StructField("Actor1CountryCode",StringType(),True),
    StructField("Actor1KnownGroupCode",StringType(),True),
    StructField("Actor1EthnicCode",StringType(),True),
    StructField("Actor1Religion1Code",StringType(),True),
    StructField("Actor1Religion2Code",StringType(),True),
    StructField("Actor1Type1Code",StringType(),True),
    StructField("Actor1Type2Code",StringType(),True),
    StructField("Actor1Type3Code",StringType(),True),
    StructField("Actor2Code",StringType(),True),
    StructField("Actor2Name",StringType(),True),
    StructField("Actor2CountryCode",StringType(),True),
    StructField("Actor2KnownGroupCode",StringType(),True),
    StructField("Actor2EthnicCode",StringType(),True),
    StructField("Actor2Religion1Code",StringType(),True),
    StructField("Actor2Religion2Code",StringType(),True),
    StructField("Actor2Type1Code",StringType(),True),
    StructField("Actor2Type2Code",StringType(),True),
    StructField("Actor2Type3Code",StringType(),True),
    StructField("IsRootEvent",LongType(),True),
    StructField("EventCode",StringType(),True),
    StructField("EventBaseCode",StringType(),True),
    StructField("EventRootCode",StringType(),True),
    StructField("QuadClass",LongType(),True),
    StructField("GoldsteinScale",FloatType(),True),
    StructField("NumMentions",LongType(),True),
    StructField("NumSources",LongType(),True),
    StructField("NumArticles",LongType(),True),
    StructField("AvgTone",FloatType(),True),
    StructField("Actor1Geo_Type",LongType(),True),
    StructField("Actor1Geo_FullName",StringType(),True),
    StructField("Actor1Geo_CountryCode",StringType(),True),
    StructField("Actor1Geo_ADM1Code",StringType(),True),
    StructField("Actor1Geo_ADM2Code",StringType(),True),
    StructField("Actor1Geo_Lat",FloatType(),True),
    StructField("Actor1Geo_Long",FloatType(),True),
    StructField("Actor1Geo_FeatureID",StringType(),True),
    StructField("Actor2Geo_Type",LongType(),True),
    StructField("Actor2Geo_FullName",StringType(),True),
    StructField("Actor2Geo_CountryCode",StringType(),True),
    StructField("Actor2Geo_ADM1Code",StringType(),True),
    StructField("Actor2Geo_ADM2Code",StringType(),True),
    StructField("Actor2Geo_Lat",FloatType(),True),
    StructField("Actor2Geo_Long",FloatType(),True),
    StructField("Actor2Geo_FeatureID",StringType(),True),
    StructField("ActionGeo_Type",LongType(),True),
    StructField("ActionGeo_FullName",StringType(),True),
    StructField("ActionGeo_CountryCode",StringType(),True),
    StructField("ActionGeo_ADM1Code",StringType(),True),
    StructField("ActionGeo_ADM2Code",StringType(),True),
    StructField("ActionGeo_Lat",FloatType(),True),
    StructField("ActionGeo_Long",FloatType(),True),
    StructField("ActionGeo_FeatureID",StringType(),True),
    StructField("DATEADDED",LongType(),True),
    StructField("SOURCEURL",StringType(),True)
    ])

MENTIONS_SCHEMA = StructType([
    StructField("GLOBALEVENTID",LongType(),True),
    StructField("EventTimeDate",LongType(),True),
    StructField("MentionTimeDate",LongType(),True),
    StructField("MentionType",LongType(),True),
    StructField("MentionSourceName",StringType(),True),
    StructField("MentionIdentifier",StringType(),True),
    StructField("SentenceID",LongType(),True),
    StructField("Actor1CharOffset",LongType(),True),
    StructField("Actor2CharOffset",LongType(),True),
    StructField("ActionCharOffset",LongType(),True),
    StructField("InRawText",LongType(),True),
    StructField("Confidence",LongType(),True),
    StructField("MentionDocLen",LongType(),True),
    StructField("MentionDocTone",FloatType(),True),
    StructField("MentionDocTranslationInfo",StringType(),True),
    StructField("Extras",StringType(),True)
    ])

SOURCES_SCHEMA = StructType([
    StructField("MentionSource",StringType(),True),
    StructField("FIPSCode",StringType(),True),
    StructField("Country",StringType(),True)
    ])

In [83]:
events = spark.read.option("sep", "\t").csv("./Data/20181125101500.export.CSV", schema=EVENTS_SCHEMA)
mentions = spark.read.option("sep", "\t").csv("./Data/20181125101500.mentions.CSV", schema=MENTIONS_SCHEMA)
sourcesbycountry = spark.read.csv("./Data/sources_by_country.CSV", schema=SOURCES_SCHEMA)

In [84]:
events.registerTempTable('events')
mentions.registerTempTable('mentions')
sourcesbycountry.registerTempTable('sourcesbycountry')

In [85]:
query = 'select GLOBALEVENTID, MentionSourceName, MentionDocTone, FIPSCode, Country as LocMention \
        from mentions \
        inner join sourcesbycountry on MentionSourceName = MentionSource \
        where MentionType == 1 '
        
mentions_by_country = spark.sql(query)

In [77]:
a_PANDAS = a.toPandas()

In [78]:
a_PANDAS

Unnamed: 0,GLOBALEVENTID,MentionSourceName,MentionDocTone,FIPSCode,LocMention
0,805089351,1590walg.com,-2.752294,US,United States
1,805097820,1590walg.com,-2.752294,US,United States
2,805120197,1590walg.com,-2.752294,US,United States
3,805110768,1590walg.com,-2.752294,US,United States
4,805094356,1590walg.com,-2.752294,US,United States
5,805120189,1590walg.com,-2.752294,US,United States
6,805119622,1590walg.com,-2.752294,US,United States
7,805119621,1590walg.com,-2.752294,US,United States
8,805119617,1590walg.com,-2.752294,US,United States
9,805119616,1590walg.com,-2.752294,US,United States


In [94]:
mentions_by_country.registerTempTable('mentions_by_country')

query2 = 'select MentionSourceName, avg(MentionDocTone), FIPSCode, LocMention \
        from mentions_by_country \
        inner join events on GLOBALEVENTID = GLOBALEVENTID_event \
        group by EventRootCode '

a = spark.sql(query2)

AnalysisException: "expression 'mentions_by_country.`MentionSourceName`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;\nAggregate [EventRootCode#2774], [MentionSourceName#2872, avg(cast(MentionDocTone#2881 as double)) AS avg(MentionDocTone)#2937, FIPSCode#2901, LocMention#2906]\n+- Join Inner, (GLOBALEVENTID#2868L = GLOBALEVENTID_event#2746L)\n   :- SubqueryAlias mentions_by_country\n   :  +- Project [GLOBALEVENTID#2868L, MentionSourceName#2872, MentionDocTone#2881, FIPSCode#2901, Country#2902 AS LocMention#2906]\n   :     +- Filter (MentionType#2871L = cast(1 as bigint))\n   :        +- Join Inner, (MentionSourceName#2872 = MentionSource#2900)\n   :           :- SubqueryAlias mentions\n   :           :  +- Relation[GLOBALEVENTID#2868L,EventTimeDate#2869L,MentionTimeDate#2870L,MentionType#2871L,MentionSourceName#2872,MentionIdentifier#2873,SentenceID#2874L,Actor1CharOffset#2875L,Actor2CharOffset#2876L,ActionCharOffset#2877L,InRawText#2878L,Confidence#2879L,MentionDocLen#2880L,MentionDocTone#2881,MentionDocTranslationInfo#2882,Extras#2883] csv\n   :           +- SubqueryAlias sourcesbycountry\n   :              +- Relation[MentionSource#2900,FIPSCode#2901,Country#2902] csv\n   +- SubqueryAlias events\n      +- Relation[GLOBALEVENTID_event#2746L,Day_DATE#2747,MonthYear_Date#2748,Year_Date#2749,FractionDate#2750,Actor1Code#2751,Actor1Name#2752,Actor1CountryCode#2753,Actor1KnownGroupCode#2754,Actor1EthnicCode#2755,Actor1Religion1Code#2756,Actor1Religion2Code#2757,Actor1Type1Code#2758,Actor1Type2Code#2759,Actor1Type3Code#2760,Actor2Code#2761,Actor2Name#2762,Actor2CountryCode#2763,Actor2KnownGroupCode#2764,Actor2EthnicCode#2765,Actor2Religion1Code#2766,Actor2Religion2Code#2767,Actor2Type1Code#2768,Actor2Type2Code#2769,... 37 more fields] csv\n"

In [93]:
a.show()

+--------------------+
| avg(MentionDocTone)|
+--------------------+
|-0.07276077301074298|
|  2.1126761436462402|
|  -5.477879267229244|
| -2.0851970010995866|
| -2.6560804386933645|
|  -5.843654960393906|
| -4.3547542437911035|
|  -4.851153053161575|
| 0.20722191072872415|
|  -5.917507246749042|
|  -2.733864177266757|
| -1.4259407611829893|
| -1.4315563865551135|
| -1.6861296660370297|
| -1.4480408430099487|
| -2.8219092587629953|
|  -1.860250787436962|
|  0.4467647147589716|
| -3.7685873839590283|
| -1.7028985818227131|
+--------------------+

