# 0. Ingesta de las tablas de la base de datos mediante Sqoop

In [None]:
ingesta = False

In [None]:
if ingesta:
    !JAVA_HOME=/usr/lib/jvm/java-7-oracle-cloudera sqoop import \
        --connect jdbc:mysql://10.0.4.201/stacklite \
        --username training \
        --password training \
        --table question_tags \
        --hive-import \
        --hive-table grupo1_question_tags

In [None]:
if ingesta:
    !JAVA_HOME=/usr/lib/jvm/java-7-oracle-cloudera sqoop import \
        --connect jdbc:mysql://10.0.4.201/stacklite \
        --username training \
        --password training \
        --table questions \
        --hive-import \
        --hive-table grupo1_questions

# 1. Carga la tabla de hive en un spark dataframe

In [1]:
from pyspark.sql import SparkSession

spark.stop()
spark = (SparkSession
         .builder
         .appName("Python Spark SQL Hive")
         .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
         .config("hive.metastore.uris", "thrift://ip-10-0-4-11.eu-west-1.compute.internal:9083")
         .enableHiveSupport()
         .getOrCreate()
         )

## 1.1 Selección de base de datos

In [3]:
spark.sql('use default')

DataFrame[]

## 1.2 Carga y fusión de tablas

In [4]:
stack_overflow_df = spark.sql("select s.tag as tag, t.creation_date as fecha \
                              from grupo1_question_tags s, grupo1_questions t where s.id = t.id").cache()

In [5]:
#stack_overflow_df.show()

# 2. Constantes

In [6]:
stack_overflow_results_path = "/home/testing86/Proyecto/data/stack_overflow_results/"
numberOfResults = 10

# 3. Preparar los datos

## 3.1 Eliminar mes y día de la fecha

In [7]:
from pyspark.sql.functions import year

stack_overflow_df = (stack_overflow_df
                       .select(stack_overflow_df.tag, year(stack_overflow_df.fecha).alias('year'))).cache()

# 4. Procesamiento de datos

## 4.1 Lista de años y número de tags por año

In [9]:
from pyspark.sql.functions import asc, count
years_and_tags_counts_df = (stack_overflow_df.groupBy('year')
                            .agg(count('tag').alias('numberOfTags'))
                            .sort(asc('year'))).cache()

In [11]:
lista_years = years_and_tags_counts_df.select('year').rdd.flatMap(lambda x: x).collect()

In [13]:
lista_tags_counts = years_and_tags_counts_df.select('numberOfTags').rdd.flatMap(lambda x: x).collect()

In [15]:
dict_years_tagsCounts = {}
for i in range(len(lista_years)):
    dict_years_tagsCounts[lista_years[i]] = lista_tags_counts[i]

### 4.1.1 Guardar los resultados a ficheros

In [17]:
years_and_tags_counts_filename = 'years_and_tags_counts.csv'
years_and_tags_counts_df.toPandas().to_csv(stack_overflow_results_path 
                                           + years_and_tags_counts_filename, header=True, index=False)

## 4.2 Top 10 de los tags más populares de cada año

In [18]:
from pyspark.sql.functions import count, desc, col, format_number

popular_tags_df = []
for i in range(len(lista_years)):
    popular_tags_df.append(stack_overflow_df
                     .filter(stack_overflow_df.year == str(lista_years[i]))
                     .groupBy('tag').agg(count('tag').alias('Menciones')).sort(desc('Menciones'))
                     .withColumn('Promedio', 
                                 format_number(col('Menciones')/lista_tags_counts[i] * 100,2).cast('Float')).cache())

### 4.2.1 Guardar los resultados a ficheros

In [21]:
anno_csv = 2008

for i in range(len(lista_years)):
    popular_tags_filename = 'popular_tags_' + str(anno_csv) +'.csv'
    popular_tags_df[i].limit(numberOfResults).toPandas().to_csv(stack_overflow_results_path 
                                          + popular_tags_filename, header=True, index=False)
    anno_csv +=1

## 4.3 Top 10 de los tags más populares del 2017

In [22]:
popular_tags_2017 = popular_tags_df[-1].select('tag').rdd.flatMap(lambda x: x).take(numberOfResults)

### 4.3.1 Guardar los resultados a ficheros

In [23]:
popular_tags_2017_filename = 'list_popular_tags_2017.csv'
popular_tags_df[-1].select('tag').limit(10).toPandas().to_csv(stack_overflow_results_path 
                                           + popular_tags_2017_filename, header=True, index=False)

### 4.3.2 Extracto de 20000 tags del 2017

In [24]:
tags_20K_2017_filename = 'tags_20K_2017.csv'
(stack_overflow_df.filter(stack_overflow_df.year == str(lista_years[-1]))
 .select('tag').limit(20000).toPandas()
 .to_csv(stack_overflow_results_path + tags_20K_2017_filename, header=True, index=False))

## 4.4 Evolución histórica de los tags más populares del 2017

In [25]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def average_tag_per_year(year, tag_count):
    counts_year = dict_years_tagsCounts[year]
    return (tag_count/counts_year)*100

udf_average_tag_per_year = udf(average_tag_per_year, DoubleType())

historical_tags_df = []
for i in range(len(popular_tags_2017)):
    historical_tags_df.append(stack_overflow_df
                 .filter(stack_overflow_df.tag == popular_tags_2017[i])
                 .groupBy('year').agg(count('year').alias('Menciones')).sort(asc('year'))
                 .withColumn('Promedio', 
                             format_number(udf_average_tag_per_year('year','Menciones'),2).cast('Float')).cache())

### 4.4.1 Guardar los resultados a ficheros

In [27]:
rank = 1

for i in range(len(popular_tags_2017)):
    popular_tags_2017_historical_filename = 'popular_tags_2017_rank_' + str(rank) +'_historical.csv'
    historical_tags_df[i].toPandas().to_csv(stack_overflow_results_path 
                                          + popular_tags_2017_historical_filename, header=True, index=False)
    rank +=1