In [188]:
!pip install pyspark



In [189]:
import pyspark

In [190]:
import pandas as pd

pd.read_csv('names.csv')

Unnamed: 0,Name;age;Experience;Salary
0,pedro;35;10;30000
1,lucas;40;8;35000
2,sofia;26;4;20000
3,ines;47;3;15000


In [191]:
from pyspark.sql import SparkSession

In [192]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [193]:
spark

In [194]:
df_pyspark = spark.read.csv('names.csv')

In [195]:
df_pyspark.show()

+--------------------+
|                 _c0|
+--------------------+
|Name;age;Experien...|
|   pedro;35;10;30000|
|    lucas;40;8;35000|
|    sofia;26;4;20000|
|     ines;47;3;15000|
+--------------------+



In [196]:
spark.read.option('header', 'true').csv('names.csv').show()

+--------------------------+
|Name;age;Experience;Salary|
+--------------------------+
|         pedro;35;10;30000|
|          lucas;40;8;35000|
|          sofia;26;4;20000|
|           ines;47;3;15000|
+--------------------------+



In [197]:
df_pyspark = spark.read.option('header', 'true').option('delimiter', ';').csv('names.csv')

df_pyspark.show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|pedro| 35|        10| 30000|
|lucas| 40|         8| 35000|
|sofia| 26|         4| 20000|
| ines| 47|         3| 15000|
+-----+---+----------+------+



In [198]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [199]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [200]:
#Age no es string, hay que agregar algo mas:
df_pyspark = spark.read.option('header', 'true').option('delimiter', ';').csv('names.csv', inferSchema = True)
df_pyspark.show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|pedro| 35|        10| 30000|
|lucas| 40|         8| 35000|
|sofia| 26|         4| 20000|
| ines| 47|         3| 15000|
+-----+---+----------+------+



In [201]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [202]:
#Para ver una columna
df_pyspark.select('Name').show()

+-----+
| Name|
+-----+
|pedro|
|lucas|
|sofia|
| ines|
+-----+



In [203]:
#Para ver mas de una columna
df_pyspark.select(['Name', 'age']).show()

+-----+---+
| Name|age|
+-----+---+
|pedro| 35|
|lucas| 40|
|sofia| 26|
| ines| 47|
+-----+---+



In [204]:
#Ver tipo de dato de cada columna
df_pyspark.dtypes


ValueError: Unable to parse datatype from schema. An error occurred while calling o739.json.
: java.lang.NoSuchMethodError: org.json4s.JsonDSL$.pair2Assoc(Lscala/Tuple2;Lscala/Function1;)Lorg/json4s/JsonDSL$JsonAssoc;
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:411)
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:102)
	at org.apache.spark.sql.types.DataType.json(DataType.scala:76)
	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [205]:
df_pyspark.describe().show()

+-------+-----+-----------------+-----------------+-----------------+
|summary| Name|              age|       Experience|           Salary|
+-------+-----+-----------------+-----------------+-----------------+
|  count|    4|                4|                4|                4|
|   mean| null|             37.0|             6.25|          25000.0|
| stddev| null|8.831760866327848|3.304037933599835|9128.709291752768|
|    min| ines|               26|                3|            15000|
|    max|sofia|               47|               10|            35000|
+-------+-----+-----------------+-----------------+-----------------+



In [206]:
#Adding columns con valor constante
from pyspark.sql.functions import col,lit
df_pyspark = df_pyspark.withColumn('Experience', lit(2))


In [207]:
df_pyspark.show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|pedro| 35|         2| 30000|
|lucas| 40|         2| 35000|
|sofia| 26|         2| 20000|
| ines| 47|         2| 15000|
+-----+---+----------+------+



In [208]:
#Adding con otra columna existente
df_pyspark = df_pyspark.withColumn('Age_2024', df_pyspark['age']+2)

In [209]:
df_pyspark.show()

+-----+---+----------+------+--------+
| Name|age|Experience|Salary|Age_2024|
+-----+---+----------+------+--------+
|pedro| 35|         2| 30000|      37|
|lucas| 40|         2| 35000|      42|
|sofia| 26|         2| 20000|      28|
| ines| 47|         2| 15000|      49|
+-----+---+----------+------+--------+



In [210]:
#Drop
df_pyspark = df_pyspark.drop('Age_2024')

In [211]:
df_pyspark.show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|pedro| 35|         2| 30000|
|lucas| 40|         2| 35000|
|sofia| 26|         2| 20000|
| ines| 47|         2| 15000|
+-----+---+----------+------+



In [212]:
#Rename column
df_pyspark = df_pyspark.withColumnRenamed('age', 'Age').show()

+-----+---+----------+------+
| Name|Age|Experience|Salary|
+-----+---+----------+------+
|pedro| 35|         2| 30000|
|lucas| 40|         2| 35000|
|sofia| 26|         2| 20000|
| ines| 47|         2| 15000|
+-----+---+----------+------+



#### EJERCICIO 2

In [213]:
df2 = spark.read.option('header', 'true').option('delimiter', ';').csv('test2.csv',inferSchema = True)



In [214]:
df2.show()

+--------+----+----------+------+
|    Name| age|Experience|Salary|
+--------+----+----------+------+
|   pedro|  35|         4|  null|
|   lucas|  40|         5| 25000|
|   sofia|  26|         3| 35000|
|    ines|  47|      null| 19000|
| nicolas|  21|         1|  null|
|    null|  25|         2| 21000|
|Patricia|null|         3| 25000|
+--------+----+----------+------+



In [215]:
#borrar las filas que tienen algun null (se borra la fila entera)
df2.na.drop().show()

ValueError: Unable to parse datatype from schema. An error occurred while calling o758.json.
: java.lang.NoSuchMethodError: org.json4s.JsonDSL$.pair2Assoc(Lscala/Tuple2;Lscala/Function1;)Lorg/json4s/JsonDSL$JsonAssoc;
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:411)
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:102)
	at org.apache.spark.sql.types.DataType.json(DataType.scala:76)
	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [217]:
# Si queremos q borre solo si todos los valores de esa fila son null
df2.na.drop(how= "all").show()

ValueError: Unable to parse datatype from schema. An error occurred while calling o762.json.
: java.lang.NoSuchMethodError: org.json4s.JsonDSL$.pair2Assoc(Lscala/Tuple2;Lscala/Function1;)Lorg/json4s/JsonDSL$JsonAssoc;
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:411)
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:102)
	at org.apache.spark.sql.types.DataType.json(DataType.scala:76)
	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [218]:
#por ejemplo especificamos que tiene q tener al menos dos valores q no sean null
df2.na.drop(how = "any", thresh = 2).show()

ValueError: Unable to parse datatype from schema. An error occurred while calling o764.json.
: java.lang.NoSuchMethodError: org.json4s.JsonDSL$.pair2Assoc(Lscala/Tuple2;Lscala/Function1;)Lorg/json4s/JsonDSL$JsonAssoc;
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:411)
	at org.apache.spark.sql.types.StructType.jsonValue(StructType.scala:102)
	at org.apache.spark.sql.types.DataType.json(DataType.scala:76)
	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [219]:
#podemos elegir de que columna queremos que se fije si tiene nulls para borrar con subset
df2.na.drop(how = "any", subset=['age']).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  pedro| 35|         4|  null|
|  lucas| 40|         5| 25000|
|  sofia| 26|         3| 35000|
|   ines| 47|      null| 19000|
|nicolas| 21|         1|  null|
|   null| 25|         2| 21000|
+-------+---+----------+------+



In [220]:
#Filling los na con un valor especifico para todos y elegimos en que columna
df2.na.fill('na', ['Name']).show()

+--------+----+----------+------+
|    Name| age|Experience|Salary|
+--------+----+----------+------+
|   pedro|  35|         4|  null|
|   lucas|  40|         5| 25000|
|   sofia|  26|         3| 35000|
|    ines|  47|      null| 19000|
| nicolas|  21|         1|  null|
|      na|  25|         2| 21000|
|Patricia|null|         3| 25000|
+--------+----+----------+------+



In [221]:
#Realizamos un imputer para hacer nuevas columnas pero con los na rellenados, en este caso elegimos con la media
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age','Experience', 'Salary']]
    ).setStrategy("mean")

In [222]:
#agregamos el imputer al df
imputer.fit(df2).transform(df2).show()

+--------+----+----------+------+-----------+------------------+--------------+
|    Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
|   pedro|  35|         4|  null|         35|                 4|         25000|
|   lucas|  40|         5| 25000|         40|                 5|         25000|
|   sofia|  26|         3| 35000|         26|                 3|         35000|
|    ines|  47|      null| 19000|         47|                 3|         19000|
| nicolas|  21|         1|  null|         21|                 1|         25000|
|    null|  25|         2| 21000|         25|                 2|         21000|
|Patricia|null|         3| 25000|         32|                 3|         25000|
+--------+----+----------+------+-----------+------------------+--------------+



#### EJERCICIO 3

In [223]:
df3 = spark.read.option('header', 'true').option('delimiter', ';').csv('test3.csv',inferSchema = True)


In [224]:
df3.show()

+--------+---+----------+------+
|    Name|age|Experience|Salary|
+--------+---+----------+------+
|   pedro| 35|         4| 24000|
|   lucas| 40|         5| 25000|
|   sofia| 26|         3| 35000|
|    ines| 47|         8| 19000|
| nicolas| 21|         1| 15000|
|   Pablo| 25|         2| 21000|
|Patricia| 29|         3| 25000|
+--------+---+----------+------+



In [225]:
#filtros
#Personas con un salario igual o menor a 2000
df3.select("Name","Salary").filter("Salary<=20000").show()

+-------+------+
|   Name|Salary|
+-------+------+
|   ines| 19000|
|nicolas| 15000|
+-------+------+



In [226]:
#filter con dos condiciones
df3.filter((df3["Salary"]<=24000)&(df3["Salary"]>=15000))\
   .select("Name", "Salary").show()

+-------+------+
|   Name|Salary|
+-------+------+
|  pedro| 24000|
|   ines| 19000|
|nicolas| 15000|
|  Pablo| 21000|
+-------+------+



In [227]:
#si queremos todo lo contratio a la condicion que ponemos hay que agregar ~
df3.filter(~(df3["Salary"]<=20000)).show()

+--------+---+----------+------+
|    Name|age|Experience|Salary|
+--------+---+----------+------+
|   pedro| 35|         4| 24000|
|   lucas| 40|         5| 25000|
|   sofia| 26|         3| 35000|
|   Pablo| 25|         2| 21000|
|Patricia| 29|         3| 25000|
+--------+---+----------+------+



#### EJERCICIO 4

In [228]:
df4 = spark.read.option('header', 'true').option('delimiter', ';').csv('test4.csv',inferSchema = True)

In [229]:
df4.show()

+--------+------------+------+
|    Name|Departaments|Salary|
+--------+------------+------+
|   pedro|Data Science| 10000|
|   lucas|         IOT|  5000|
|   pedro|    Big Data|  4000|
|   lucas|    Big Data|  4000|
| nicolas|         IOT| 20000|
| nicolas|Data Science| 15000|
|Patricia|Data Science| 25000|
+--------+------------+------+



In [230]:
df4.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departaments: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [231]:
#GroupBy para ver quien es el q tiene el mayor sueldo 
df4.groupBy('Name').sum().show()


+--------+-----------+
|    Name|sum(Salary)|
+--------+-----------+
| nicolas|      35000|
|Patricia|      25000|
|   pedro|      14000|
|   lucas|       9000|
+--------+-----------+



In [232]:
#groupBy departaments para ver cual es la media de los salarios
df4.groupBy('Departaments').mean().show()


+------------+------------------+
|Departaments|       avg(Salary)|
+------------+------------------+
|         IOT|           12500.0|
|    Big Data|            4000.0|
|Data Science|16666.666666666668|
+------------+------------------+



In [233]:
#groupby para contar cuantas personas hay en cada departamento
df4.groupBy('Departaments').count().show()

+------------+-----+
|Departaments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    2|
|Data Science|    3|
+------------+-----+

