### Dette er en python notebook som leses inn i Databricks.  
Vi skal gjennomføre en enkel tekstanalyse i Spark:  
* Lese inn twittermeldinger som inneholder emnetagg #MAGA
* Skille ut individuelle ord
* Slette typiske 'stoppord'
* Lese inn en sentimentordbok vi skal matche mot twitterord
* Lage en gjennomsnittsscore for hver tweet, fra positiv til negativ
* Lese score tilbake til originalsettet, og eksportere for visualisering av resultater (Oppgave 2)  

#### Hovedformålet er å vise hvordan skyteknologi benyttes til å kverne store mengder data med SQL, pySpark eller Scala - uten betydlige forvaltningsmiljøer.  

### For å kjøre koden i en celle benytt 'Play' til høyre eller benytt Ctrl+Enter.  
  
### Oppgaven består av å kjøre cellene, samt se på resultatet av kjøringer.

In [2]:
# Vi skal benytte en blanding av python og SQL - 
# samt hente pakker fra et maskinlæringsbibliotek ml.feature for å kverne tekstdata enklere.
from pyspark.sql import SQLContext, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import io
import requests
import urllib
import csv
from pyspark.ml.feature import HashingTF, IDF,Tokenizer, RegexTokenizer, StopWordsRemover
import pandas as pd
sqlContext = SQLContext(sc)

##### I denne prosessen laster vi inn rådata, omdøper kolonner og velger å slette 'RT' fra twitterteksten.  
Det vanligste er å benytte spark.table som kan sammenliknes med dataframes i Python/Pandas.  

### * Action: benytt display() for å se på dataframe og hvilke kolonner vi mottar fra Twitter

In [4]:
#Github-datasett for analyse, inneholder 15000 tweets med emneknagg #MAGA
url = "https://raw.githubusercontent.com/mariustr/twitter-sentiment/master/twitterdata.csv"
urllib.urlretrieve(url, 'twitterdata.csv')
#Les inn temp-fil som Spark-tabell:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', delimiter = ';').load('file:/databricks/driver/twitterdata.csv')
#display(df)

###### Før vi henter ut enkeltord i twittermedldingene, vil vi fjerne tegnsetting og gjøre om all tekst til små bokstaver.  
Deretter separerer vi ut enkeltord fra twittermeldingen. Tokenizer-pakken hjelper med prosessen. Her velger vi også å lage en ny tabell med færre kolonner før vi fortsetter analysen.  
For mer avansert tekstprosessering kan man også benytte Tokenizer.Regex-funksjoner.  

### * Action: For Tokenizer-funksjonen, fyll inn inputkolonne for twittermeldinger, og ny outputkolonne for resultatet. Outputkolonnen skal hete "words".

In [6]:
#Gjør om til små bokstaver og flern tegnsetting
df = df.withColumn('text', lower(df.text))
df = df.withColumn('text', regexp_replace('text',r'[^a-z0-9\s]',''))
# Benytt Tokonizer i pyspark.ml-pakke for å skille ut enkeltord
tokenizer = Tokenizer(inputCol="text",outputCol="words")
tweetData = tokenizer.transform(df)
#Begrenser datasettet til 3 kolonner for videreanalyse. Originaltabell 'df' vil bestå.
tweetData = tweetData.select('id', 'text','words')
#display(df)

In [7]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(tweetData)
featurizedData.show(5)

##### Trekk ut stoppord, de mest brukte nøytrale ordene fra en engelsk ordliste (default).  
### Action: Velg inputkolonne hvor du skal slette stoppord. Velg nytt navn på outputkolonne.

In [9]:
remover = StopWordsRemover(inputCol="words", outputCol = "filtered")
tweetDataStopRemoved = remover.transform(tweetData)
#display(df2)

##### Last inn sentimentdata fra Github. Vi skal benytte et datasett med enkeltord, i neste iterasjon kunne vi benyttet et annet datasett som har score per kombinasjon av to ord.

In [11]:
#Les fra Github
url2 = "https://raw.githubusercontent.com/mariustr/twitter-sentiment/master/S140-AFFLEX-NEGLEX-unigrams.txt"
urllib.urlretrieve(url2, 'sentimentdata.txt')
#Les inn som Sparktabell
sentimentoppslag = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='true', delimiter = '\t').load('file:/databricks/driver/sentimentdata.txt')
#Skriv om kolonnenavn
sentimentoppslag = sentimentoppslag.withColumnRenamed("_c0", "sentimentword")
sentimentoppslag = sentimentoppslag.withColumnRenamed("_c1", "sentimentscore")
#Behold bare de to første kolonnene
sentimentoppslag = sentimentoppslag.select("sentimentword", "sentimentscore")
#Skift datatype fra tekst til desimal for scoringskolonnen
sentimentoppslag = sentimentoppslag.withColumn("sentimentscore", sentimentoppslag.sentimentscore.cast('float'))
#display(sentimentoppslag)

In [12]:
tweetsentimentframe = tweetDataStopRemoved.select('id',explode('filtered').alias('sentimentword')).join(sentimentoppslag, 'sentimentword')
#display(tweetsentimentframe)

In [13]:
sentiment_prediction = tweetsentimentframe.groupBy('id').agg(avg('sentimentscore').alias('snittscore'))
display(sentiment_prediction)

In [14]:
tweetDataSentimentScore = df.join(sentiment_prediction, "id")
#tweetDataSentimentScore.sort("id").desc().collect()
display(tweetDataSentimentScore)

In [15]:
sentiment_prediction.SkrivTilS3('/dbfs/FileStore/tables/scores.csv', sep=',', header=True, index=False)
sentiment_prediction.SkrivTilRedshiftDB('/dbfs/FileStore/tables/scores.csv', sep=',', header=True, index=False)