# Projekt ZTNBD

## Lokalna instalacja
###### Wymagania
- docker
- ~4GB wolnego miejsca na dysku
- zalecany linux + build-essential

###### Linux 
```bash
make notebook
```
###### Others (nie sprawdzane)
```bash
# lokalizacja - główny katalogu projektu
docker run -it --rm -v $(pwd):/home/jovyan/work -p 8888:8888 jupyter/pyspark-notebook
```

Komenda startuje kontener dockerowy z Jupyterem i podmontowuje katalog projektu.
Moduły znajdują się w katalogu `modules` i tam też będą lądowały kolejne.
Uruchomienie póki co możliwe jest tylko z poziomu notebook'a.

+ Dokumentacja google [Google Docs](https://docs.google.com/document/d/1IylTvJbRe8s_j_bZqbM-6nWVa2IQDjQiZx-NyJsGZbg)

## Moduły

##### PostTransformer
Klasa PostTransformer dziedziczy po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta wydobywa z kolumny inputCol, z formatu json, content i umieszcza go w kolumnie outputCol w postaci tekstu.
##### TranslateTransformer
Klasa TranslateTransformer dziedziczy po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta tłumaczy tekst zawarty w kolumnie inputCol  z języka polskiego na angielski i umieszcza go w kolumnie outputCol.
##### SentenceTransformer
Klasa SentenceTransformer dziedziczy po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta dzieli tekst zawarty w kolumnie inputCol  na zdania i umieszcza go w kolumnie outputCol w postaci tablicy tekstów.
##### SpeechPartsTransformer
Klasa SpeechPartsTransformer dziedziczy  po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta z tekstu zawartego w kolumnie inputCol  zlicza wystąpienie części mowy i wstawia do outputCol w postaci jsona.
##### SentimentTransformer
Klasa SentimentTransformer dziedziczy  po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta z tekstu zawartego w kolumnie inputCol  wylicza sentiment i wstawia do kolumny outputCol.
#####  MeanFeaturesTransformer
Klasa MeanFeaturesTransformer dziedziczy  po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Klasa ta przyjmuje dodatkowy parametr features, który zawiera listę nazw cech, które mają zostać zagregowane. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta wylicza średnie wartości dla cech podanych w parametrze features, ze wszystkich obiektów jsonowych znajdujących się w tekście zawartym w kolumnie inputCol. Wyliczone wartości wstawia do kolumny outputCol w postaci listy double’i.
##### MedianFeaturesTransformer
Klasa MedianFeaturesTransformer dziedziczy  po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Klasa ta przyjmuje dodatkowy parametr features, który zawiera listę nazw cech, które mają zostać zagregowane. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta wylicza medianę dla cech podanych w parametrze features, ze wszystkich obiektów jsonowych znajdujących się w tekście zawartym w kolumnie inputCol. Wyliczone wartości wstawia do kolumny outputCol w postaci listy double’i.
##### NumberOfOccurrencesFeaturesTransformer
Klasa NumberOfOccurrencesFeaturesTransformer dziedziczy  po klasach pyspark.ml.Transformer, pyspark.ml.param.shared.HasInputCol, pyspark.ml.param.shared.HasOutputCol. Klasa ta przyjmuje dodatkowy parametr features, który zawiera listę nazw cech, które mają zostać zagregowane. Posiada metodę transform, która przyjmuje na wejściu obiekt typu dataframe. Metoda ta zlicza ilość wystąpień cech podanych w parametrze features, ze wszystkich obiektów jsonowych znajdujących się w tekście zawartym w kolumnie inputCol. Wyliczone wartości wstawia do kolumny outputCol w postaci listy double’i.


## Przykład użycia

### Instalacja Textblob

In [1]:
!pip install textblob
!python -m textblob.download_corpora lite

[nltk_data] Downloading package brown to /home/jovyan/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
Finished.


### Inicjalizacja środowiska

In [2]:
import json

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml import Pipeline

from modules.posts import (
    SentenceTransformer, PostTransformer, TranslateTransformer,
    SpeechPartsTransformer, BasicSpeechPartsTransformer, SentimentTransformer
)
from modules.features_ import (
    MaxFeaturesTransformer,
    MeanFeaturesTransformer,
    MedianFeaturesTransformer,
    NumberOfOccurrencesFeaturesTransformer,
    FeatureTransformer,
    SelectFeaturesTransformer
)

sc = SparkContext('local[*]', 'PipelineFlow')
sess = SparkSession(sc)
sqlContext = SQLContext(sc)
    

### Wczytywanie plików

In [3]:
def load_features(spark_ctx, files):
    rdd = spark_ctx.wholeTextFiles(files)
    rdd = rdd.map(lambda x: (x[0], x[1]))
    df = rdd.toDF(['file', 'content'])
    return df

def load_posts(spark_ctx, files):
    rdd = spark_ctx.wholeTextFiles(files)
    rdd = rdd.map(lambda x: (x[0], json.loads(x[1])))
    df = rdd.toDF(['file', 'content'])
    return df

### Przykłady użycia transformerów z pipeline'ami

In [4]:
def transform_features(features_as_df):
    
    features = [
        "leaf",
        "has-attribute-class",
    ]
    
    featurer = FeatureTransformer();
    featurer.setInputCol('content').setOutputCol('features')
    
    feature_selector = SelectFeaturesTransformer(features=features)
    feature_selector.setInputCol('features').setOutputCol('selected_features')
    
    max_feature_transformer = MaxFeaturesTransformer()
    max_feature_transformer.setInputCol('selected_features').setOutputCol('max')
    
    mean_feature_transformer = MeanFeaturesTransformer()
    mean_feature_transformer.setInputCol('selected_features').setOutputCol('mean')
    
    median_feature_transformer = MedianFeaturesTransformer()
    median_feature_transformer.setInputCol('selected_features').setOutputCol('median')
    
    number_of_occurences_feature_transformer = NumberOfOccurrencesFeaturesTransformer()
    number_of_occurences_feature_transformer.setInputCol('selected_features').setOutputCol('number_of_occurences')
    
    stages = [
        featurer,
        feature_selector,
        max_feature_transformer,
        mean_feature_transformer,
        median_feature_transformer,
        number_of_occurences_feature_transformer
    ]
        
    pipeline = Pipeline(stages=stages)
    pipeline = pipeline.fit(features_as_df)
    pipeline = pipeline.transform(features_as_df)
    
    return pipeline

def transform_posts(posts_as_df):
    
    poster = PostTransformer()
    poster.setInputCol('content').setOutputCol('posts')
    
    translator = TranslateTransformer()
    translator.setInputCol('posts').setOutputCol('translated')
    
    sentencer = SentenceTransformer()
    sentencer.setInputCol('translated').setOutputCol('sentences')
    
    speech_parter = SpeechPartsTransformer()
    speech_parter.setInputCol('translated').setOutputCol('speechParts')
    
    basic_speech_parter = BasicSpeechPartsTransformer()
    basic_speech_parter.setInputCol('translated').setOutputCol('basicSpeechParts')
    
    sentimenter = SentimentTransformer()
    sentimenter.setInputCol('translated').setOutputCol('sentiments')

    stages = [
        poster,
        translator, 
        sentencer, 
        speech_parter,
        basic_speech_parter,
        sentimenter
    ]
    
    pipeline = Pipeline(stages=stages)
    pipeline = pipeline.fit(posts_as_df)
    pipeline = pipeline.transform(posts_as_df)
    
    return pipeline

### Wyniki:

In [5]:
feature_file = 'data/featuresample.json'
loaded_features = load_features(sc, feature_file)

result = transform_features(loaded_features)
             
result.select('selected_features','max','mean', 'median', 'number_of_occurences').collect()

[Row(selected_features={'leaf': [1.0, 2.0, 1.0, 0.0, 1.0, 1.0, 2.0], 'has-attribute-class': [1.0, 1.0, 1.0, 1.0]}, max={'leaf': 2.0, 'has-attribute-class': 1.0}, mean={'leaf': 1.1428571428571428, 'has-attribute-class': 1.0}, median={'leaf': 1.0, 'has-attribute-class': 1.0}, number_of_occurences={'leaf': 7, 'has-attribute-class': 4})]

In [6]:
post_files = 'data/posts/*'
loaded_posts = load_posts(sc, post_files)

post_pipeline = transform_posts(loaded_posts)
post_pipeline = post_pipeline.select('sentences', 'translated', 'speechParts', 'basicSpeechParts', 'sentiments')
post_pipeline.show(3)

+--------------------+--------------------+--------------------+----------------+--------------------+
|           sentences|          translated|         speechParts|basicSpeechParts|          sentiments|
+--------------------+--------------------+--------------------+----------------+--------------------+
|[Oh, please ... v...|[Oh, please ... v...|Map(IN -> 20, EX ...|    [48, 43, 17]|[[0.3,0.716666666...|
|[Oh, I'm also the...|[Oh, I'm also the...|Map(IN -> 44, RP ...|    [94, 83, 33]|[[0.1921212121212...|
|[Tell me, what do...|[Tell me, what do...|Map(IN -> 39, PRP...|    [79, 54, 31]|[[0.25,0.75], [-0...|
+--------------------+--------------------+--------------------+----------------+--------------------+



#### Metody pomocnicze:

In [7]:
def sample_output(posts_df):
    a = posts_df.select('sentences').first().sentences[0]
    b = posts_df.select('sentences').first().sentences[1]
    c = posts_df.select('sentences').first().sentences[2]
    d = posts_df.select('translated').first().translated[0]
    e = posts_df.select('speechParts').first().speechParts
    f = posts_df.select('sentiments').first().sentiments[0]
    g = posts_df.select('sentiments').first().sentiments[1]
    h = posts_df.select('sentiments').first().sentiments[2]
    scheme = '{}\n\n{}\n\n{}\n\n{}\n\n{}\n\n{}\n\n{}\n\n{}'
    print(scheme.format(a,b,c,d,e,f,g,h))