In [1]:
# GCS Config
GCP_PROJECT = 'NS01-Project'
MODEL_BUCKET = 'gs://twitter_testtt'
VERSION_NAME = 'v1'
MODEL_NAME = 'xgmodel'

In [2]:
%load_ext google.cloud.bigquery

The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery


In [5]:
%%bigquery protest_df
SELECT *
FROM `ns01-project.twitter_data.tweets`
LIMIT 2000

Query complete after 0.00s: 100%|██████████| 2/2 [00:00<00:00, 852.33query/s]                         
Downloading: 100%|██████████| 300/300 [00:01<00:00, 299.32rows/s]


In [6]:
protest_df

Unnamed: 0,id,text,lang,location,posted_at
0,1468665145928867844,RT @NadiaWhittomeMP: The Downing Street party ...,en,the simulation,2021-12-08 19:33:51
1,1468665159304368128,"RT @MintPressNews: ""Media bears a responsibili...",en,,2021-12-08 19:33:55
2,1468665170071281671,Woooiiiii folks in this space are saying they'...,en,"London, England",2021-12-08 19:33:57
3,1468665186772869124,RT @live_Tripathi: ओमीक्रोन का खौफ: AKTU के छा...,hi,,2021-12-08 19:34:01
4,1468665193391697940,Where and when do we get out on the streets to...,en,,2021-12-08 19:34:03
...,...,...,...,...,...
295,1468666224292806662,@IAmCiele @thenixmin @doshinswitch @Evoltal_ @...,en,,2021-12-08 19:38:09
296,1468666228088532994,RT @timeindawater1: 5.5.Donald Trump was not p...,en,,2021-12-08 19:38:09
297,1468666229468667904,RT @newsbht1: London chaos: Capital gripped by...,en,,2021-12-08 19:38:10
298,1468666233591681030,RT @NadiaWhittomeMP: The Downing Street party ...,en,,2021-12-08 19:38:11


In [9]:
violent_df = pd.DataFrame(protest_df[protest_df['lang']=='en']['text'], columns=['text']) # Violent word detection
location_df = protest_df[['location', 'id']] # location of tweet
where_df = protest_df # find location in context

In [8]:
import numpy as np
import pandas as pd
import re

### Location of tweet

In [10]:
def only_country(text):
    text = text.lower()
    text = text.replace(",", " ")
    spl_space_trigger = text.split(" ")
    
    if len(spl_space_trigger) > 1:
        text = spl_space_trigger[-1]
    else:
        return text
    
    if text == 'kingdom' or text == 'england':
        text = 'uk'
    elif text == 'states':
        text = 'usa'
    elif text == '':
        text = 'undefined'
        
    return text

In [11]:
location_df['location'] = location_df.apply(lambda row : only_country(row['location']) if(np.all(pd.notnull(row['location']))) else row['location'], axis=1)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  location_df['location'] = location_df.apply(lambda row : only_country(row['location']) if(np.all(pd.notnull(row['location']))) else row['location'], axis=1)


In [12]:
location_of_tweet = location_df.groupby("location").count().sort_values(by='id', ascending=False).head(10)

In [13]:
location_of_tweet

Unnamed: 0_level_0,id
location,Unnamed: 1_level_1
uk,33
undefined,13
usa,11
london,8
nigeria,6
pakistan,5
ca,4
deutschland,4
canada,4
serbia,3


## Location in context

In [14]:
import spacy
from spacy import displacy 

In [15]:
nlp = spacy.load("en_core_web_sm")
# Text with nlp
doc = nlp(" Multiple tornado warnings were issued for parts of New York on Sunday night.The first warning, which expired at 9 p.m., covered the Bronx, Yonkers and New Rochelle. More than 2 million people live in the impacted area.")
# Display Entities
displacy.render(doc, style="ent")

In [16]:
def clean_line(text):
    text = re.sub(r"http\S+", "", text)
    text = re.sub(r"@[A-Za-z0-9]+", "", text)
    text = re.sub(r"#[A-Za-z0-9]+", "", text)
    text = text.replace(":","")
    text = text.lower()
    text = text.strip()
    return text

def loc_from_text(df):
    new_df = df.copy()
    new_df = new_df[new_df['lang']=='en']
    text_list = new_df['text']
    loc = []
    for text in text_list:
        doc = nlp(text)
        loc.extend(ent.text for ent in doc.ents if ent.label_ in ['GPE'])
    return loc

In [17]:
loc_context = loc_from_text(where_df)

In [18]:
loc_context_df = pd.DataFrame(data = loc_context, columns=['location'])

In [19]:
loc_context_df['location'] = loc_context_df.apply(lambda row : clean_line(row['location']), axis=1)
loc_context_df = loc_context_df.drop(loc_context_df[loc_context_df['location'] == ""].index)

In [20]:
loc_result_df = pd.DataFrame(loc_context_df.value_counts().head(10), columns=['count'])

In [21]:
loc_result_df

Unnamed: 0_level_0,count
location,Unnamed: 1_level_1
london,7
canada,6
australia,5
uk,4
china,4
america,3
austria,3
lyari,3
chile,3
🇧|,2


## Violent Detection

In [22]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
        struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

spark = (SparkSession
         .builder
         .appName("DataFrameHandOn")
         .master("local[*]")
         .getOrCreate())
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f8480123a00>


In [23]:
spark_df = pandas_to_spark(violent_df)
spark_df.cache()

DataFrame[text: string]

In [24]:
spark_df.show(10)

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+
|                text|
+--------------------+
|RT @NadiaWhittome...|
|RT @MintPressNews...|
|Woooiiiii folks i...|
|Where and when do...|
|RT @Ysbryd5: Nurs...|
|RT @AndersonAfDMd...|
|RT @AlinejadMasih...|
|RT @TomPope695079...|
|The Palestinian p...|
|RT @GeorgeMonbiot...|
+--------------------+
only showing top 10 rows



                                                                                

In [25]:
def preprocess(text_string):
    """
    Accepts a text string and replaces:
    1) urls with URLHERE
    2) lots of whitespace with one instance
    3) mentions with MENTIONHERE

    This allows us to get standardized counts of urls and mentions
    Without caring about specific people mentioned
    """
    space_pattern = '\s+'
    giant_url_regex = ('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|'
        '[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
    mention_regex = '@[\w\-]+'
    parsed_text = re.sub(space_pattern, ' ', str(text_string))
    parsed_text = re.sub(giant_url_regex, '', str(parsed_text))
    parsed_text = re.sub(mention_regex, '', str(parsed_text))
    parsed_text = re.sub("[^a-zA-Z:,]+", ' ', str(parsed_text))
    parsed_text = parsed_text.replace('RT', '')
    parsed_text = parsed_text.replace('!', '')
    parsed_text = parsed_text.replace(':', '')
    parsed_text = parsed_text.strip('\'"')
    parsed_text = parsed_text.lower()
    parsed_text = parsed_text.lstrip()
    
    return parsed_text

In [26]:
txt_process_udf = udf(preprocess, StringType())
new_df = spark_df.withColumn('text', txt_process_udf('text'))

In [27]:
new_df.show(10)

+--------------------+
|                text|
+--------------------+
|the downing stree...|
|media bears a res...|
|woooiiiii folks i...|
|where and when do...|
|nurse karen organ...|
|australia, austri...|
|this father who i...|
|hundreds of thous...|
|the palestinian p...|
|this should be al...|
+--------------------+
only showing top 10 rows



#### Prediction

In [28]:
import xgboost as xgb
from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.ml import Pipeline

In [29]:
# Load trained parameters from GCS
w2v = Word2Vec.load("gs://twitter_testtt/w2v_model1")

tokenizer = Tokenizer(inputCol="text", outputCol="words")
w2v_pipeline = Pipeline(stages=[tokenizer, w2v])
w2v_pipeline_model = w2v_pipeline.fit(new_df)
train_df = w2v_pipeline_model.transform(new_df)

21/12/08 19:44:16 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/12/08 19:44:16 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [30]:
train_df.show(10)

+--------------------+--------------------+--------------------+
|                text|               words|            Features|
+--------------------+--------------------+--------------------+
|the downing stree...|[the, downing, st...|[-0.0105149733019...|
|media bears a res...|[media, bears, a,...|[-0.0083240476390...|
|woooiiiii folks i...|[woooiiiii, folks...|[-0.0110597727221...|
|where and when do...|[where, and, when...|[-0.0114352357632...|
|nurse karen organ...|[nurse, karen, or...|[-0.0118300816852...|
|australia, austri...|[australia,, aust...|[-0.0069386506220...|
|this father who i...|[this, father, wh...|[-0.0137400973222...|
|hundreds of thous...|[hundreds, of, th...|[-0.0158928056286...|
|the palestinian p...|[the, palestinian...|[-0.0078335809833...|
|this should be al...|[this, should, be...|[-0.0098890137349...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [31]:
# Load XGBoost model parameters
model = xgb.Booster()
model.load_model("./model1.bst")

In [32]:
train_features = train_df.select("Features").collect()
X_train = np.asarray([v[0].toArray() for v in train_features])
X_train = xgb.DMatrix(X_train)
pred = model.predict(X_train)

In [33]:
pred_df = pd.DataFrame(pred.astype(int), columns=["class"])
violent_result_df = pd.DataFrame(pred_df.value_counts(), columns=['count'])

In [34]:
violent_result_df

Unnamed: 0_level_0,count
class,Unnamed: 1_level_1
1,237
0,25


# Save result to BigQuery ---> BI

In [35]:
from google.cloud import bigquery

client = bigquery.Client()
table_id1 = 'ns01-project.results.tweets_location'
table_id2 = 'ns01-project.results.context_location'
table_id3 = 'ns01-project.results.violent_count'

In [240]:
job_config_table1 = bigquery.LoadJobConfig(schema=[
    bigquery.SchemaField("location", "STRING"),
    bigquery.SchemaField("id", "INT64")
])

job1 = client.load_table_from_dataframe(
    location_of_tweet, table_id1, job_config=job_config_table1
)

In [242]:
job_config_table2 = bigquery.LoadJobConfig(schema=[
    bigquery.SchemaField("location", "STRING"),
    bigquery.SchemaField("count", "INT64")
])

job2 = client.load_table_from_dataframe(
    loc_result_df, table_id2, job_config=job_config_table2
)

In [243]:
job_config_table3 = bigquery.LoadJobConfig(schema=[
    bigquery.SchemaField("class", "INT64"),
    bigquery.SchemaField("count", "INT64")
])

job3 = client.load_table_from_dataframe(
    violent_result_df, table_id3, job_config=job_config_table3
)