# Practice 

In [3]:
import os

project_root = "/Users/nimisha/Desktop/learning/news-reliability-detector/"
os.chdir(project_root)

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import (
    HashingTF,
    IDF,
    Tokenizer,
    VectorAssembler,
)
from pyspark.ml import Pipeline

In [5]:
spark = SparkSession.builder.appName("NewsReliability").getOrCreate()

24/01/13 12:55:57 WARN Utils: Your hostname, centella.local resolves to a loopback address: 127.0.0.1; using 192.168.1.71 instead (on interface en0)
24/01/13 12:55:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/13 12:55:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/13 12:55:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/01/13 12:55:59 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [6]:
data = spark.read.csv('data/processed/processed.csv', header=True, inferSchema=True)

## Train Test Split

In [7]:
train_data, test_data = data.randomSplit(weights=[0.8, 0.2], seed=100)

In [8]:
# print(train_data.count())
# print(test_data.count())

### Count vectorization
- count how many times each word show up in the text - frequency count
-  transforms the count into DTM(Document Term Matrix)
- Matices are store as sparse matrix to save space, as many values are 0
- Bag of Words

### TF-IDF 
- Term Frequency - Inverse Document Frequency
- address issue with document frequency
- instead of filling DMT with word frequency count, it calculates TD-IDF
- TF: raw count of a term in the document
- but words that are common accross documents? IDF !
- IDF: diminishes the weight of term that occur frequently in the document set and increases the weight of term that occess rarely. 

### TfidfVectorizer
- does both: create bag of word and convert to tfidf

### Tokenization
- split the content into words array

In [9]:
tokenizer = Tokenizer(inputCol="content", outputCol="tokenized")

### HashingTf 
- for term frequencies

In [10]:
hashingTf = HashingTF(inputCol="tokenized", outputCol="tfhashed", numFeatures=100)



### IDF
- for inverse document frequency

In [11]:
idf = IDF(inputCol="tfhashed", outputCol="features")

### Vector Assembler

In [12]:
feature_assembler = VectorAssembler(inputCols=["features"], outputCol="features_vector")

## Pipeline

In [13]:
pipeline = Pipeline(stages=[tokenizer, hashingTf, idf, feature_assembler])

In [14]:
pipeline_model = pipeline.fit(train_data)

                                                                                

In [15]:
train_transformed = pipeline_model.transform(train_data)
test_transformed = pipeline_model.transform(test_data)

In [16]:
test_transformed.show(2)

+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|target|             content|           tokenized|            tfhashed|            features|     features_vector|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     0|460 peopl injur c...|[460, peopl, inju...|(100,[5,6,8,9,12,...|(100,[5,6,8,9,12,...|(100,[5,6,8,9,12,...|
|     0|ackman valeant pl...|[ackman, valeant,...|(100,[0,1,2,3,4,5...|(100,[0,1,2,3,4,5...|[0.87414456771326...|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [17]:
train_transformed.show(2)

+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|target|             content|           tokenized|            tfhashed|            features|     features_vector|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     0|17yearold danish ...|[17yearold, danis...|(100,[0,1,8,14,15...|(100,[0,1,8,14,15...|(100,[0,1,8,14,15...|
|     0|2 hous republican...|[2, hous, republi...|(100,[2,4,5,11,13...|(100,[2,4,5,11,13...|(100,[2,4,5,11,13...|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



24/01/13 12:56:16 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
