In [15]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext, SQLContext


In [16]:
spark = SparkSession.builder.master("local[*]").appName("ExercicioIntro").getOrCreate()

In [17]:
df_nomes = spark.read.csv("nomes_aleatorios.txt")

In [35]:
df_nomes.show(5)

+----------------+
|             _c0|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



In [11]:
df_nomes.printSchema()  

root
 |-- _c0: string (nullable = true)



In [120]:
df_nomes = spark.read.csv("nomes_aleatorios.txt", header=False)

df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")

df_nomes.show(10)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



In [121]:
from pyspark.sql.functions import * 
from pyspark.sql.functions import when
from pyspark.sql.functions import rand


df_nomes = df_nomes.withColumn("Escolaridade", when(rand() < 0.33, "Fundamental")
                           .when(rand() < 0.66, "Medio")
                           .otherwise("Superior"))

df_nomes.show()

+-----------------+------------+
|            Nomes|Escolaridade|
+-----------------+------------+
|   Frances Bennet| Fundamental|
|    Jamie Russell|       Medio|
|   Edward Kistler|       Medio|
|    Sheila Maurer|       Medio|
| Donald Golightly|       Medio|
|       David Gray|    Superior|
|      Joy Bennett|       Medio|
|      Paul Kriese| Fundamental|
|Berniece Ornellas| Fundamental|
|    Brian Farrell| Fundamental|
|   Kara Mcelwaine|       Medio|
|    Tracy Herring|    Superior|
|  Howard Lazarine|       Medio|
|     Leroy Strahl|    Superior|
|     Ernest Hulet|    Superior|
|     David Medina|    Superior|
|   Lorenzo Woodis|    Superior|
|      Page Marthe| Fundamental|
|   Herbert Morris|       Medio|
|      Albert Leef|    Superior|
+-----------------+------------+
only showing top 20 rows



In [122]:
paises = ["Brasil", "Argentina", "Colômbia", "Chile", "Peru", "Venezuela", "Equador", "Bolívia", "Paraguai", "Uruguai", "Guiana", "Suriname", "Guiana Francesa"]

df_nomes = df_nomes.withColumn("Pais", when(rand() < 1/13, paises[0])
                                .when(rand() < 2/13, paises[1])
                                .when(rand() < 3/13, paises[2])
                                .when(rand() < 4/13, paises[3])
                                .when(rand() < 5/13, paises[4])
                                .when(rand() < 6/13, paises[5])
                                .when(rand() < 7/13, paises[6])
                                .when(rand() < 8/13, paises[7])
                                .when(rand() < 9/13, paises[8])
                                .when(rand() < 10/13, paises[9])
                                .when(rand() < 11/13, paises[10])
                                .when(rand() < 12/13, paises[11])
                                .otherwise(paises[12]))

df_nomes.show()

+-----------------+------------+---------+
|            Nomes|Escolaridade|     Pais|
+-----------------+------------+---------+
|   Frances Bennet| Fundamental|Venezuela|
|    Jamie Russell|       Medio| Colômbia|
|   Edward Kistler|       Medio|   Brasil|
|    Sheila Maurer|       Medio|   Brasil|
| Donald Golightly|       Medio|  Equador|
|       David Gray|    Superior|Argentina|
|      Joy Bennett|       Medio|Venezuela|
|      Paul Kriese| Fundamental|  Equador|
|Berniece Ornellas| Fundamental|Venezuela|
|    Brian Farrell| Fundamental|    Chile|
|   Kara Mcelwaine|       Medio|     Peru|
|    Tracy Herring|    Superior|     Peru|
|  Howard Lazarine|       Medio| Colômbia|
|     Leroy Strahl|    Superior|  Bolívia|
|     Ernest Hulet|    Superior|Argentina|
|     David Medina|    Superior|    Chile|
|   Lorenzo Woodis|    Superior|    Chile|
|      Page Marthe| Fundamental|Venezuela|
|   Herbert Morris|       Medio|     Peru|
|      Albert Leef|    Superior|Argentina|
+----------

In [123]:
df_nomes = df_nomes.withColumn("AnoNascimento", (1945 + (rand() * (2010 - 1945))).cast('int'))

df_nomes.show()

+-----------------+------------+---------+-------------+
|            Nomes|Escolaridade|     Pais|AnoNascimento|
+-----------------+------------+---------+-------------+
|   Frances Bennet| Fundamental|Venezuela|         1990|
|    Jamie Russell|       Medio| Colômbia|         1963|
|   Edward Kistler|       Medio|   Brasil|         1985|
|    Sheila Maurer|       Medio|   Brasil|         1969|
| Donald Golightly|       Medio|  Equador|         1973|
|       David Gray|    Superior|Argentina|         1951|
|      Joy Bennett|       Medio|Venezuela|         1959|
|      Paul Kriese| Fundamental|  Equador|         1963|
|Berniece Ornellas| Fundamental|Venezuela|         1946|
|    Brian Farrell| Fundamental|    Chile|         1999|
|   Kara Mcelwaine|       Medio|     Peru|         1956|
|    Tracy Herring|    Superior|     Peru|         2000|
|  Howard Lazarine|       Medio| Colômbia|         1981|
|     Leroy Strahl|    Superior|  Bolívia|         1980|
|     Ernest Hulet|    Superior

In [124]:
df_select = df_nomes.select("Nomes").filter((df_nomes.AnoNascimento >= 2000))

df_select.show(10)

+----------------+
|           Nomes|
+----------------+
|   Tracy Herring|
|    David Medina|
| Helen Blackwell|
|    Rebecca Snow|
|  Mary Dillahunt|
|   Ricky Gilbert|
|      Ned Tester|
|    James Barton|
|  Ashley Trosper|
|Richelle Vasquez|
+----------------+
only showing top 10 rows



In [203]:
df_nomes.createOrReplaceTempView ("Nomes")

spark.sql("select Nomes from Nomes where AnoNascimento >=2000").show(10)

+----------------+
|           Nomes|
+----------------+
|   Tracy Herring|
|    David Medina|
| Helen Blackwell|
|    Rebecca Snow|
|  Mary Dillahunt|
|   Ricky Gilbert|
|      Ned Tester|
|    James Barton|
|  Ashley Trosper|
|Richelle Vasquez|
+----------------+
only showing top 10 rows



In [126]:
count_millennials = df_nomes.select("Nomes").filter((df_nomes.AnoNascimento >= 1980) & (df_nomes.AnoNascimento <= 1994)).count()

print(count_millennials)

2309807


In [204]:
spark.sql("select count(*) as MILLENNIALS FROM Nomes WHERE AnoNascimento BETWEEN 1980 AND 1994").show()

+-----------+
|MILLENNIALS|
+-----------+
|    2309807|
+-----------+



In [182]:
geracao_Z = spark.sql("select Pais, count(*) as TOTAL_Z FROM Nomes WHERE AnoNascimento BETWEEN 1995 AND 2015 GROUP BY Pais")

geracao_Z = geracao_Z.withColumn("Geracoes", lit("GERACAO Z"))

geracao_Z.show()

+---------------+------+---------+
|           Pais| TOTAL| Geracoes|
+---------------+------+---------+
|      Argentina|327376|GERACAO Z|
|           Peru|369116|GERACAO Z|
|         Guiana|  3337|GERACAO Z|
|          Chile|426219|GERACAO Z|
|       Suriname|   601|GERACAO Z|
|       Paraguai| 38828|GERACAO Z|
|Guiana Francesa|    47|GERACAO Z|
|        Equador|171868|GERACAO Z|
|        Uruguai| 13573|GERACAO Z|
|      Venezuela|272989|GERACAO Z|
|        Bolívia| 90469|GERACAO Z|
|         Brasil|177678|GERACAO Z|
|       Colômbia|416663|GERACAO Z|
+---------------+------+---------+



In [183]:
geracao_X = spark.sql("select Pais, count(*) as TOTAL_X FROM Nomes WHERE AnoNascimento BETWEEN 1965 AND 1979 GROUP BY Pais")

geracao_X = geracao_X.withColumn("Geracoes", lit("GERACAO X"))

geracao_X.show()

+---------------+------+---------+
|           Pais| TOTAL| Geracoes|
+---------------+------+---------+
|      Argentina|327123|GERACAO X|
|           Peru|369115|GERACAO X|
|         Guiana|  3332|GERACAO X|
|          Chile|426365|GERACAO X|
|       Suriname|   592|GERACAO X|
|       Paraguai| 38763|GERACAO X|
|Guiana Francesa|    45|GERACAO X|
|        Equador|171298|GERACAO X|
|        Uruguai| 13256|GERACAO X|
|      Venezuela|271805|GERACAO X|
|        Bolívia| 90409|GERACAO X|
|         Brasil|177095|GERACAO X|
|       Colômbia|416128|GERACAO X|
+---------------+------+---------+



In [184]:
BB = spark.sql("select Pais, count(*) as TOTAL_BB FROM Nomes WHERE AnoNascimento BETWEEN 1944 AND 1964 GROUP BY Pais")

BB = BB.withColumn("Geracoes", lit("BABY BOOMERS"))

BB.show()

+---------------+------+------------+
|           Pais| TOTAL|    Geracoes|
+---------------+------+------------+
|      Argentina|436665|BABY BOOMERS|
|           Peru|492582|BABY BOOMERS|
|         Guiana|  4447|BABY BOOMERS|
|          Chile|568606|BABY BOOMERS|
|       Suriname|   744|BABY BOOMERS|
|       Paraguai| 52226|BABY BOOMERS|
|Guiana Francesa|    54|BABY BOOMERS|
|        Equador|228934|BABY BOOMERS|
|        Uruguai| 17827|BABY BOOMERS|
|      Venezuela|363341|BABY BOOMERS|
|        Bolívia|120463|BABY BOOMERS|
|         Brasil|237200|BABY BOOMERS|
|       Colômbia|553014|BABY BOOMERS|
+---------------+------+------------+



In [185]:
Millennials = spark.sql("select Pais, count(*) as TOTAL_M FROM Nomes WHERE AnoNascimento BETWEEN 1980 AND 1994 GROUP BY Pais")

Millennials = Millennials.withColumn("Geracoes", lit("MILLENNIALS"))

Millennials.show()

+---------------+------+-----------+
|           Pais| TOTAL|   Geracoes|
+---------------+------+-----------+
|      Argentina|327962|MILLENNIALS|
|           Peru|370268|MILLENNIALS|
|         Guiana|  3405|MILLENNIALS|
|          Chile|427353|MILLENNIALS|
|       Suriname|   587|MILLENNIALS|
|       Paraguai| 39455|MILLENNIALS|
|Guiana Francesa|    38|MILLENNIALS|
|        Equador|171212|MILLENNIALS|
|        Uruguai| 13275|MILLENNIALS|
|      Venezuela|272670|MILLENNIALS|
|        Bolívia| 90102|MILLENNIALS|
|         Brasil|177079|MILLENNIALS|
|       Colômbia|416401|MILLENNIALS|
+---------------+------+-----------+



In [193]:
j1 = geracao_Z.union(geracao_X)
j2 = Millennials.union(BB)
j3 = j1.union(j2)

j3.show()

+---------------+------+---------+
|           Pais| TOTAL| Geracoes|
+---------------+------+---------+
|      Argentina|327376|GERACAO Z|
|           Peru|369116|GERACAO Z|
|         Guiana|  3337|GERACAO Z|
|          Chile|426219|GERACAO Z|
|       Suriname|   601|GERACAO Z|
|       Paraguai| 38828|GERACAO Z|
|Guiana Francesa|    47|GERACAO Z|
|        Equador|171868|GERACAO Z|
|        Uruguai| 13573|GERACAO Z|
|      Venezuela|272989|GERACAO Z|
|        Bolívia| 90469|GERACAO Z|
|         Brasil|177678|GERACAO Z|
|       Colômbia|416663|GERACAO Z|
|      Argentina|327123|GERACAO X|
|           Peru|369115|GERACAO X|
|         Guiana|  3332|GERACAO X|
|          Chile|426365|GERACAO X|
|       Suriname|   592|GERACAO X|
|       Paraguai| 38763|GERACAO X|
|Guiana Francesa|    45|GERACAO X|
+---------------+------+---------+
only showing top 20 rows



In [194]:
j3.createOrReplaceTempView ("Total")

spark.sql("select * from Total").show(10)

+---------------+------+---------+
|           Pais| TOTAL| Geracoes|
+---------------+------+---------+
|      Argentina|327376|GERACAO Z|
|           Peru|369116|GERACAO Z|
|         Guiana|  3337|GERACAO Z|
|          Chile|426219|GERACAO Z|
|       Suriname|   601|GERACAO Z|
|       Paraguai| 38828|GERACAO Z|
|Guiana Francesa|    47|GERACAO Z|
|        Equador|171868|GERACAO Z|
|        Uruguai| 13573|GERACAO Z|
|      Venezuela|272989|GERACAO Z|
+---------------+------+---------+
only showing top 10 rows



In [197]:
j3 = spark.sql("SELECT * FROM Total ORDER BY Pais, Geracoes, Total")
j3.show()

+---------+------+------------+
|     Pais| TOTAL|    Geracoes|
+---------+------+------------+
|Argentina|436665|BABY BOOMERS|
|Argentina|327123|   GERACAO X|
|Argentina|327376|   GERACAO Z|
|Argentina|327962| MILLENNIALS|
|  Bolívia|120463|BABY BOOMERS|
|  Bolívia| 90409|   GERACAO X|
|  Bolívia| 90469|   GERACAO Z|
|  Bolívia| 90102| MILLENNIALS|
|   Brasil|237200|BABY BOOMERS|
|   Brasil|177095|   GERACAO X|
|   Brasil|177678|   GERACAO Z|
|   Brasil|177079| MILLENNIALS|
|    Chile|568606|BABY BOOMERS|
|    Chile|426365|   GERACAO X|
|    Chile|426219|   GERACAO Z|
|    Chile|427353| MILLENNIALS|
| Colômbia|553014|BABY BOOMERS|
| Colômbia|416128|   GERACAO X|
| Colômbia|416663|   GERACAO Z|
| Colômbia|416401| MILLENNIALS|
+---------+------+------------+
only showing top 20 rows



In [14]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
spark.stop()