In [1]:
import os # OS e.g directory structure
import numpy as np # linear algebra
import scipy as sc  # scientific computing
import pandas as pd # data processing, file I/O
import seaborn as sns  # visualization
import matplotlib.pyplot as plt # visualization
import warnings
warnings.filterwarnings("ignore")

In [14]:

import sys

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:

! ls -la
! head -n 3 library-collection-inventory.csv

total 22983760
drwxr-xr-x  5 macbookpro  staff          160 19 Mai 10:41 [34m.[m[m
drwxr-xr-x  6 macbookpro  staff          192  6 Mai 14:27 [34m..[m[m
drwxr-xr-x  3 macbookpro  staff           96  7 Mai 00:31 [34m.ipynb_checkpoints[m[m
-rw-r--r--@ 1 macbookpro  staff        13157 19 Mai 10:41 TP_BIG_DATA.ipynb
-rw-rw-r--@ 1 macbookpro  staff  11764863851  1 Dez  2019 library-collection-inventory.csv
BibNum,Title,Author,ISBN,PublicationYear,Publisher,Subjects,ItemType,ItemCollection,FloatingItem,ItemLocation,ReportDate,ItemCount
3011076,"A tale of two friends / adapted by Ellie O'Ryan ; illustrated by Tom Caulfield, Frederick Gardner, Megan Petasky, and Allen Tam.","O'Ryan, Ellie","1481425730, 1481425749, 9781481425735, 9781481425742",2014.,"Simon Spotlight,","Musicians Fiction, Bullfighters Fiction, Best friends Fiction, Friendship Fiction, Adventure and adventurers Fiction",jcbk,ncrdr,Floating,qna,2017-09-01T00:00:00.000,1
2248846,"Naruto. Vol. 1, Uzumaki Naruto / story and 

In [4]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType, DoubleType,ArrayType

fire_schema = StructType([StructField("BibNum", IntegerType(),True),
                             StructField("Title", StringType(),True),
                             StructField("Author", StringType(),True),
                             StructField("ISBN", IntegerType(),True),
                             StructField("PublicationYear", IntegerType(),True),
                             StructField("Publisher", StringType(),True),
                             StructField("Subjects", StringType(), True),
                             StructField("ItemType", StringType(),True),
                             StructField("ItemCollection", StringType(),True),
                             StructField("FloatingItem", StringType(),True),
                             StructField("ItemLocation", StringType(),True),
                             StructField("ReportDate", TimestampType(),True),
                             StructField("ItemCount", IntegerType(),True)
                            ])


In [5]:


df = spark.read.csv('library-collection-inventory.csv',header=True, schema=fire_schema, sep=",")



### Exploratory data analysis

In [6]:
df.printSchema()
cols = df.columns
df.count()

root
 |-- BibNum: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- ISBN: integer (nullable = true)
 |-- PublicationYear: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Subjects: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- ItemCollection: string (nullable = true)
 |-- FloatingItem: string (nullable = true)
 |-- ItemLocation: string (nullable = true)
 |-- ReportDate: timestamp (nullable = true)
 |-- ItemCount: integer (nullable = true)



35531308

# Data cleaning




In [7]:
df_1 = df.dropna(subset=["Author","Publisher"])

In [8]:
df_1.select("Subjects").filter("PublicationYear is not null").show()

+--------------------+
|            Subjects|
+--------------------+
|Grandfathers Juve...|
|Brigham Young Uni...|
|Cooking Japanese,...|
|Counting Fiction,...|
|Parents Death Fic...|
|Missing persons F...|
|Paper Juvenile li...|
|Mount Rainier Nat...|
|Giraffe Juvenile ...|
|Sharks Juvenile f...|
|Undercover operat...|
|                null|
|Superheroes Comic...|
|                null|
|Probation Fiction...|
|Unemployment Juve...|
|Rand Paul 1914 19...|
|                null|
|Russo Japanese Wa...|
|Quilting, Quiltin...|
+--------------------+
only showing top 20 rows



In [9]:
max_row =10000

df_clean = df_1.limit(max_row)

df_clean.count()

10000

In [10]:
from pyspark.ml.feature import StringIndexer

def transformToIndex(inputArr, data):
    indexer=None 
    for col_in, col_out in inputArr.items():
        
        indexer = StringIndexer(inputCol=col_in, outputCol=col_out)
        data = indexer.fit(data).transform(data)
        
        # converçao para int 
        data = data.withColumn(col_out, data[col_out].cast(IntegerType()))
    return data
col_to_trasform = {'ItemType': 'ItemTypeIndex',
                   'Author':'AuthorIndex',
                   'Publisher':'PublisherIndex',
                   'ItemCollection': 'ItemCollectionIndex'}


In [11]:
df_clean = transformToIndex(col_to_trasform,df_clean)

transformToIndex(col_to_trasform,df_clean)

In [12]:
df_clean.printSchema()


root
 |-- BibNum: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- ISBN: integer (nullable = true)
 |-- PublicationYear: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Subjects: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- ItemCollection: string (nullable = true)
 |-- FloatingItem: string (nullable = true)
 |-- ItemLocation: string (nullable = true)
 |-- ReportDate: timestamp (nullable = true)
 |-- ItemCount: integer (nullable = true)
 |-- ItemTypeIndex: integer (nullable = true)
 |-- AuthorIndex: integer (nullable = true)
 |-- PublisherIndex: integer (nullable = true)
 |-- ItemCollectionIndex: integer (nullable = true)



### Tratamento da Variavel Preditora 

In [22]:
# conversao de string para array
df_clean = df_clean.withColumn('Subjects_split', split(col('Subjects'), ",").cast("array<string>"))

In [23]:
df_clean.printSchema()

df_clean.select('Subjects_split','PublisherIndex','AuthorIndex').filter("Subjects is not null").show()

root
 |-- BibNum: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- ISBN: integer (nullable = true)
 |-- PublicationYear: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Subjects: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- ItemCollection: string (nullable = true)
 |-- FloatingItem: string (nullable = true)
 |-- ItemLocation: string (nullable = true)
 |-- ReportDate: timestamp (nullable = true)
 |-- ItemCount: integer (nullable = true)
 |-- ItemTypeIndex: integer (nullable = true)
 |-- AuthorIndex: integer (nullable = true)
 |-- PublisherIndex: integer (nullable = true)
 |-- ItemCollectionIndex: integer (nullable = true)
 |-- Subjects_split: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+--------------+-----------+
|      Subjects_split|PublisherIndex|AuthorIndex|
+--------------------+--------------+-----------+
|[Musicians Fictio...

IllegalArgumentException: requirement failed: Column Subjects_split must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.ArrayType:array<string>.

In [None]:
df_clean.drop("PublisherIndex")\
.printSchema()
#df_clean.select('Author','Publisher').show()

In [None]:
df_clean.select('Author','Publisher','PublisherIndex').show()

In [16]:
col_to_drop = ['ItemCount','FloatingItem','BibNum','ReportDate']

In [27]:
cols_feactures = [c for c in cols if c not in col_to_drop]
col_interesting = ['PublisherIndex','AuthorIndex']

In [None]:
df_clean.write.format("parquet").mode("overwrite")\
                .save("library-file.parquet")

In [None]:
! ls -la

In [28]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# Correlation requires vectors so prior we convert to vector column

vector_col = "corr_features"
assembler = VectorAssembler(inputCols=col_interesting, outputCol=vector_col)
df_vector = assembler.transform(df_clean).select(vector_col)

# get correlation matrix

matrix = Correlation.corr(df_vector, vector_col).collect()[0][0]
corrmatrix = matrix.toArray().tolist()
# corrmatrix
corrmatrix

[[1.0, 0.1672540728248159], [0.1672540728248159, 1.0]]

In [None]:
def ScatterPlot(df, width, height):
    plt.figure(figsize=(width, height))
    plt.pcolor(df)
    plt.yticks(np.arange(0.5, len(df.index), 1), df.index)
    plt.xticks(np.arange(0.5, len(df.columns), 1), df.columns)
    plt.show()

In [None]:
# Python DataFrame for visualization

df_plot = pd.DataFrame(data=corrmatrix)
# df_plot

In [None]:
ScatterPlot(df_plot, 20, 20)

In [None]:
df_clean.describe(["AuthorIndex","PublisherIndex"]).show()