# **Spark Activity**

In [1]:
!pip install pyspark findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 44 kB/s 
[?25hCollecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 54.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=19491c67c5c0d3b25165200f556ee84f9dd292bf9d854b88922925c7fe43830b
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark, findspark
Successfully installed findspark-2.0.1 py4j-0.10.9.5 pyspark-3.3.1


## **Importação das bibliotecas**

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from google.colab import drive 

import yaml
import findspark
import json
import hashlib


drive.mount('/content/drive')
findspark.init()
spark = SparkSession.builder.appName('SparkRDDExamples').getOrCreate()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## **Leitura dos arquivos CSV**

### **Arquivo JCS**

In [17]:
#leitura do csv
df_jcs = spark.read.csv("/content/drive/MyDrive/MBA_DE_02/Atividade spark/Source/jcs_2020.csv", header=True, sep=';', inferSchema=True)

#drop das colunas nao utilizadas
df_jcs = df_jcs.drop("Rank", "_c3", "_c6", "_c7")

#Tratamento do nome das colunas
df_jcs = df_jcs.withColumnRenamed("Full Journal Title", "Title") \
                .withColumnRenamed("Total Cites", "Total_Cites") \
                .withColumnRenamed("Journal Impact Factor", "Journal_Impact_Factor") \
                .withColumnRenamed("Eigenfactor Score", "Eigenfactor_Score")

#Criacao da coluna id_title com hash
df_jcs = df_jcs.withColumn("id_title", abs(hash(upper(col("Title")))))

df_jcs.limit(2).toPandas()

Unnamed: 0,Title,Total_Cites,Journal_Impact_Factor,Eigenfactor_Score,id_title
0,CA-A CANCER JOURNAL FOR CLINICIANS,55868,508.702,0.10514,132650473
1,NATURE REVIEWS MOLECULAR CELL BIOLOGY,58477,94.444,0.07548,1011077702


### **Arquivo SCIMAGO**

In [18]:
#leitura do csv
df_scimago = spark.read.csv("/content/drive/MyDrive/MBA_DE_02/Atividade spark/Source/scimagojr 2020.csv", header=True, sep=';', inferSchema=True)

#drop das colunas nao utilizadas
df_scimago = df_scimago.drop("Rank")

#Tratamento do nome das colunas
df_scimago = df_scimago.withColumnRenamed("Type", "type_publication") \
        .withColumnRenamed("Publisher", "publisher") \
        .withColumnRenamed("Issn", "issn_full") \
        .withColumnRenamed("SJR Best Quartile", "SJR_Best_Quartile") \
        .withColumnRenamed("H index", "H_index") \
        .withColumnRenamed("Total Docs. (2020)", "Total_Docs_2020") \
        .withColumnRenamed("Total Docs. (3years)", "Total_Docs_3years") \
        .withColumnRenamed("Total Refs.", "Total_Refs") \
        .withColumnRenamed("Total Cites (3years)", "Total_Cites_3years") \
        .withColumnRenamed("Citable Docs. (3years)", "Citable_Docs_3years") \
        .withColumnRenamed("Cites / Doc. (2years)", "Cites_Doc_2years") \
        .withColumnRenamed("Ref. / Doc.", "Ref_Doc") 
        
#Passando o conteudo da coluna issn_full, transformando em lista
df_scimago = df_scimago.withColumn("Issn", split(col("issn_full"),","))

#Explode da coluna issn
df_scimago = df_scimago.withColumn("Issn", explode(df_scimago.Issn))

#Criacao da coluna id_title com hash
df_scimago = df_scimago.withColumn("id_title", abs(hash(upper(col("Title")))))

df_scimago.limit(2).toPandas()

Unnamed: 0,Sourceid,Title,type_publication,issn_full,SJR,SJR_Best_Quartile,H_index,Total_Docs_2020,Total_Docs_3years,Total_Refs,...,Citable_Docs_3years,Cites_Doc_2years,Ref_Doc,Country,Region,publisher,Coverage,Categories,Issn,id_title
0,28773,Ca-A Cancer Journal for Clinicians,journal,"15424863, 00079235",62937,Q1,168,47,119,3452,...,80,12634,7345,United States,Northern America,Wiley-Blackwell,1950-2020,Hematology (Q1); Oncology (Q1),15424863,132650473
1,28773,Ca-A Cancer Journal for Clinicians,journal,"15424863, 00079235",62937,Q1,168,47,119,3452,...,80,12634,7345,United States,Northern America,Wiley-Blackwell,1950-2020,Hematology (Q1); Oncology (Q1),79235,132650473


## **Criando tabelas SQL temporárias para o JCS e SCIMAGO**

In [6]:
spark.sql('DROP TABLE IF EXISTS  table_scimago')
spark.sql('DROP TABLE IF EXISTS  table_jcs')

df_scimago.createTempView('table_scimago')

df_jcs.createTempView('table_jcs')

## **Join entre as tabelas temporárias JCS e SCIMAGO**

In [7]:
df_csv = spark.sql('''
    select  sc.Sourceid
      ,case 
        when sc.Title is not null
          then sc.Title
        else jcs.Title end as Title
      ,sc.type_publication
	  ,sc.issn_full
	  ,sc.Issn
	  ,sc.SJR
	  ,sc.SJR_Best_Quartile
	  ,sc.H_index
	  ,sc.Total_Docs_2020
	  ,sc.Total_Docs_3years
	  ,sc.Total_Refs
	  ,sc.Total_Cites_3years
	  ,sc.Citable_Docs_3years
	  ,sc.Cites_Doc_2years
	  ,sc.Ref_Doc
	  ,sc.Country
	  ,sc.Region
	  ,sc.publisher
	  ,sc.Coverage
	  ,sc.Categories
	  ,jcs.Total_Cites
	  ,jcs.Journal_Impact_Factor
	  ,jcs.Eigenfactor_Score
    from table_scimago sc 
    full outer join table_jcs jcs
      on sc.id_title = jcs.id_title
    ''')

spark.sql('DROP TABLE IF EXISTS  table_scimago')
spark.sql('DROP TABLE IF EXISTS  table_jcs')

DataFrame[]

In [None]:
df_csv.limit(2).toPandas()

Unnamed: 0,Sourceid,Title,type_publication,issn_full,Issn,SJR,SJR_Best_Quartile,H_index,Total_Docs_2020,Total_Docs_3years,...,Cites_Doc_2years,Ref_Doc,Country,Region,publisher,Coverage,Categories,Total_Cites,Journal_Impact_Factor,Eigenfactor_Score
0,21100981225,Gynecology,journal,"20795831, 20795696",20795831,,-,1,57,0,...,0,2846,Russian Federation,Eastern Europe,Consilium Medikum,2020,Obstetrics and Gynecology,,,
1,21100981225,Gynecology,journal,"20795831, 20795696",20795696,,-,1,57,0,...,0,2846,Russian Federation,Eastern Europe,Consilium Medikum,2020,Obstetrics and Gynecology,,,


In [8]:
print(df_csv.count())

52036


## **Leitura do arquivo csv - exportado do bibtex**

In [19]:
#leitura do arquivo csv
df_bib = spark.read.csv("/content/drive/MyDrive/MBA_DE_02/Atividade spark/Source/file_bib.csv", sep='|', header=True, inferSchema=True)

#Drop da coluna nao utilizada
df_bib = df_bib.drop("_c0")

df_bib.limit(2).toPandas()

Unnamed: 0,series,location,keywords,numpages,pages,booktitle,abstract,doi,url,address,...,articleno,month,journal,issn,number,volume,issue_date,note,edition,editor
0,BDE 2019,"Hong Kong, Hong Kong","Sentiment analysis, Big data quality metrics, ...",8,36–43,Proceedings of the 2019 International Conferen...,"In a world increasingly connected, and in whic...",10.1145/3341620.3341629,https://doi.org/10.1145/3341620.3341629,"New York, NY, USA",...,,,,,,,,,,
1,SITA'20,"Rabat, Morocco","Data Quality evaluation, Data Quality, Quality...",6,,Proceedings of the 13th International Conferen...,"In recent years, as more and more data sources...",10.1145/3419604.3419803,https://doi.org/10.1145/3419604.3419803,"New York, NY, USA",...,16.0,,,,,,,,,


## **Join do csv com arquivo exportado do bibtex**

In [11]:
df_end = df_bib.join(df_csv, df_csv.Issn == df_bib.issn, "left").drop(df_csv.Title) \
                                                                .drop(df_csv.Issn) \
                                                                .drop(df_csv.issn_full) \
                                                                .drop(df_csv.publisher) \
                                                                .drop(df_csv.type_publication) 
df_end.limit(2).toPandas()

Unnamed: 0,series,location,keywords,numpages,pages,booktitle,abstract,doi,url,address,...,Citable_Docs_3years,Cites_Doc_2years,Ref_Doc,Country,Region,Coverage,Categories,Total_Cites,Journal_Impact_Factor,Eigenfactor_Score
0,BDE 2019,"Hong Kong, Hong Kong","Sentiment analysis, Big data quality metrics, ...",8,36–43,Proceedings of the 2019 International Conferen...,"In a world increasingly connected, and in whic...",10.1145/3341620.3341629,https://doi.org/10.1145/3341620.3341629,"New York, NY, USA",...,,,,,,,,,,
1,SITA'20,"Rabat, Morocco","Data Quality evaluation, Data Quality, Quality...",6,,Proceedings of the 13th International Conferen...,"In recent years, as more and more data sources...",10.1145/3419604.3419803,https://doi.org/10.1145/3419604.3419803,"New York, NY, USA",...,,,,,,,,,,


In [12]:
print(df_end.count())

2974


## **Tratamento**

In [20]:
#Criacao das colunas id_title e id_issn com hash
df_end = df_end.withColumn("id_title", abs(hash(upper(col("title")))))
df_end = df_end.withColumn("id_issn", abs(hash(col("issn"))))

#Remocao dos duplicados
df_end = df_end.dropDuplicates(subset=['id_title', 'id_issn'])

print(df_end.count())

2591


## **Export para csv**

In [22]:
# Export CSV com Pandas
df_end.toPandas().to_csv("/content/drive/MyDrive/MBA_DE_02/Atividade spark/Export/export_pyspark.csv", sep='|', encoding='utf-8', header='true')

# Export CSV com a função write
df_end.write.options(header='True', delimiter='|').csv("/content/drive/MyDrive/MBA_DE_02/Atividade spark/Export/export_opc", mode='overwrite')
