# Dados de Entrada
* Link: https://tinyurl.com/bigdata-gut-pt
* Selecione "Adicionar ao Drive"


## Equipe:
* Kaleb Roncatti de Souza
* Nelson Gomes Brasil Júnior

# Setup

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!pip install findspark pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 43.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=c3e1bec6b7dad7fb073f8559dc344f874dcda021059914c85ccf00091470556f
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark, findspark
Successfully installed findspark-2.0.1 py4j-0.10.9.5 pyspark-3.3.1


In [3]:
%env PYTHONHASHSEED=1234
%env JAVA_HOME=/usr/lib/jvm/default-java
%env SPARK_HOME=/content/spark-3.2.2-bin-hadoop3.2

env: PYTHONHASHSEED=1234
env: JAVA_HOME=/usr/lib/jvm/default-java
env: SPARK_HOME=/content/spark-3.2.2-bin-hadoop3.2


In [4]:
import findspark
findspark.init("/content/spark-3.2.2-bin-hadoop3.2")

In [5]:
from pyspark.sql import SparkSession

from datetime import datetime

appName = 'Big Data'
master = 'local'

spark = SparkSession.builder     \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Entradas de teste e validação

Os comandos abaixo criam duas entradas para desenvolvimento e validação no diretório `entradas_teste`:

*   Arquivo A.txt: "Um carro está dirigindo na rua."
*   Arquivo B.txt: "Um caminhão está dirigindo na rodovia."
*   Arquivo C.txt: "Um motorista encontrou com outro motorista."

O resultado esperado para a etapa de cálculo de TF para estes arquivos é:

```
[(('A.txt', 'um'), 0.16666666666666666),
 (('A.txt', 'carro'), 0.16666666666666666),
 (('A.txt', 'está'), 0.16666666666666666),
 (('A.txt', 'dirigindo'), 0.16666666666666666),
 (('A.txt', 'na'), 0.16666666666666666),
 (('A.txt', 'rua'), 0.16666666666666666),
 (('B.txt', 'um'), 0.16666666666666666),
 (('B.txt', 'caminhão'), 0.16666666666666666),
 (('B.txt', 'está'), 0.16666666666666666),
 (('B.txt', 'dirigindo'), 0.16666666666666666),
 (('B.txt', 'na'), 0.16666666666666666),
 (('B.txt', 'rodovia'), 0.16666666666666666),
 (('C.txt', 'um'), 0.16666666666666666),
 (('C.txt', 'motorista'), 0.3333333333333333),
 (('C.txt', 'encontrou'), 0.16666666666666666),
 (('C.txt', 'com'), 0.16666666666666666),
 (('C.txt', 'outro'), 0.16666666666666666)]
 ```

 O resultado esperado para o cálculo de IDF é:

````
[('um', 0.0),
 ('carro', 1.0986122886681098),
 ('está', 0.4054651081081644),
 ('dirigindo', 0.4054651081081644),
 ('na', 0.4054651081081644),
 ('rua', 1.0986122886681098),
 ('caminhão', 1.0986122886681098),
 ('rodovia', 1.0986122886681098),
 ('motorista', 1.0986122886681098),
 ('encontrou', 1.0986122886681098),
 ('com', 1.0986122886681098),
 ('outro', 1.0986122886681098)]
 ````

Finalmente, o resultado final (TD IDF ordenado) esperado é:

```
[(('C.txt', 'motorista'), 0.37),
 (('B.txt', 'caminhão'), 0.18),
 (('B.txt', 'rodovia'), 0.18),
 (('C.txt', 'encontrou'), 0.18),
 (('C.txt', 'com'), 0.18),
 (('C.txt', 'outro'), 0.18),
 (('A.txt', 'carro'), 0.18),
 (('A.txt', 'rua'), 0.18),
 (('A.txt', 'está'), 0.07),
 (('B.txt', 'está'), 0.07),
 (('A.txt', 'dirigindo'), 0.07),
 (('B.txt', 'dirigindo'), 0.07),
 (('A.txt', 'na'), 0.07),
 (('B.txt', 'na'), 0.07),
 (('A.txt', 'um'), 0.0),
 (('B.txt', 'um'), 0.0),
 (('C.txt', 'um'), 0.0)]
```

Use estas entradas para desenvolvimento, testes, e validação. Para a solução final, processe todos os arquivos no diretório `all` do pacote de dados.


In [6]:
!mkdir entradas_teste

In [7]:
!rm entradas_teste/*

rm: cannot remove 'entradas_teste/*': No such file or directory


In [8]:
!echo "Um carro está dirigindo na rua." >> entradas_teste/A.txt

In [9]:
!echo "Um caminhão está dirigindo na rodovia." >> entradas_teste/B.txt

In [10]:
!echo "Um motorista encontrou com outro motorista." >> entradas_teste/C.txt

# Solução

In [11]:
import re
import numpy as np

In [12]:
# Conjunto completo (para solução final)
input_dir = 'file:/content/drive/My Drive/gut-pt/all/'

In [13]:
# Conjunto menor (para desenvolvimento)
# input_dir = 'file:/content/drive/My Drive/gut-pt/small/'

In [14]:
# Entradas teste (para testes e verificação)
# input_dir = 'file:/content/entradas_teste/'

In [15]:
# Leitura dos dados
input_files = spark.sparkContext.wholeTextFiles(input_dir+"*")

In [16]:
# Limpeza do Texto

input_files_clean = input_files.map(lambda line: (line[0].split("/")[-1], re.sub('[^a-zà-ù ]', ' ', line[1].lower())))   

In [17]:
# Conjunto de dados termo-documento
# ((documento, termo), 1)

term_doc = input_files_clean.flatMap(lambda line: [((line[0], i), 1) for i in line[1].split()]) 

In [18]:
# Calculando quantas vezes cada termo aparece no documento

term_times = term_doc.reduceByKey(lambda c1, c2: c1 + c2)\
                     .map(lambda line: (line[0][0], (line[0][1], line[1])))

In [19]:
# Calculando o tamanho de cada documento

size = term_doc.map(lambda line: (line[0][0], line[1]))\
               .reduceByKey(lambda c1, c2: c1 + c2)

In [20]:
# Agrupando resultados

joined_tf = term_times.join(size)

In [21]:
joined_tf.take(10)

[('u-23526-8', (('sonhavam', 1), 24613)),
 ('u-23526-8', (('duquezas', 2), 24613)),
 ('u-23526-8', (('taful', 1), 24613)),
 ('u-23526-8', (('bandidos', 1), 24613)),
 ('u-23526-8', (('redemptor', 4), 24613)),
 ('u-23526-8', (('parti', 1), 24613)),
 ('u-23526-8', (('intrujões', 1), 24613)),
 ('u-23526-8', (('atirem', 1), 24613)),
 ('u-23526-8', (('cortejo', 1), 24613)),
 ('u-23526-8', (('arco', 1), 24613))]

In [22]:
def computing_TF(line):
  
  # separating variables
  document = line[0]
  values = line[1]

  # values = ((term, freq), size)
  term_freq = values[0]
  
  term = term_freq[0]
  freq = term_freq[1] 
  size = values[1]

  # TF = frequency/size of the document
  TF = freq/size

  yield((term, document), TF)

In [23]:
# Calculando TF

TF = joined_tf.flatMap(computing_TF)

In [24]:
TF.take(10)

[(('alguma', 'u-21799-8'), 0.00036075036075036075),
 (('soërguer', 'u-21799-8'), 0.00018037518037518038),
 (('juba', 'u-21799-8'), 0.00018037518037518038),
 (('alastro', 'u-21799-8'), 0.00018037518037518038),
 (('incerta', 'u-21799-8'), 0.00018037518037518038),
 (('melodia', 'u-21799-8'), 0.00018037518037518038),
 (('will', 'u-21799-8'), 0.0012626262626262627),
 (('below', 'u-21799-8'), 0.0005411255411255411),
 (('proprietary', 'u-21799-8'), 0.00018037518037518038),
 (('outras', 'u-21799-8'), 0.00018037518037518038)]

In [25]:
## Ocorrencia dos termos em cada documento

input_files_clean = input_files.map(lambda line: (re.sub('[^a-zà-ù ]', ' ', line[1].lower()), line[0].split("/")[-1]))

In [26]:
term_document_unique = input_files_clean.flatMap(lambda line: [(i, line[1]) for i in line[0].split()])\
                                        .distinct()

In [27]:
# Mapeando e contando em quantos documentos aparece o termo

occurrence_term = term_document_unique.map(lambda line: (line[0], 1))\
                                      .reduceByKey(lambda c1, c2: c1 + c2)

In [28]:
# Contanto a quantidade de documentos

num_files = input_files.count()

In [29]:
def computing_IDF(line):
  
  # separating variables
  term = line[0]
  freq = line[1]

  # TF = log(qty_docs/freq)
  IDF = np.log(num_files/freq)

  yield(term, IDF)

In [30]:
IDF = occurrence_term.flatMap(computing_IDF)

In [31]:
def map_TF(line):
  term_doc = line[0]

  term = term_doc[0]
  document = term_doc[1]
  TF = line[1]
  
  yield((term, (document, TF)))

In [32]:
TF_map = TF.flatMap(map_TF)

In [33]:
def compute_TF_IDF(line):

  # values = ((doc, TF), IDF)
  values = line[1]
  doc_TF = values[0]
  

  term = line[0]
  document = doc_TF[0]
  TF = doc_TF[1]
  IDF = values[1]

  yield(document, term, TF*IDF)

In [34]:
final_solution = TF_map.join(IDF)\
                       .flatMap(compute_TF_IDF)\
                       .sortBy(lambda line: line[-1], ascending=False)

In [35]:
final_solution.take(1000).saveAsTextFile("TF_IDFsolution")

AttributeError: ignored