<a href="https://colab.research.google.com/github/jpforol/Studies/blob/main/Pyspark_Introduction_and_Exemples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Pyspark
!pip3 install pyspark==3.0.2

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

list = [1,2,3,4,5]
rdd1 = spark.sparkContext.parallelize(list)

In [7]:
rdd1

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [8]:
rdd2 = spark.sparkContext.textFile("/opt/spark/README.md")
rdd2

/opt/spark/README.md MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

In [11]:
rddf = rdd2.flatMap(lambda line:line.split())
rddf

PythonRDD[3] at RDD at PythonRDD.scala:53

In [14]:
#Média por nome das pessoas usando a API de RDDs
dataRDD = spark.sparkContext.parallelize([("Pedro", 38), ("Maria", 20), ("Pedro",40), ("Rafael", 10)])

agesRDD = (
    dataRDD
    .map(lambda x: (x[0], (x[1], 1)))
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
    .map(lambda x: (x[0], x[1][0]/x[1][1]))
)
agesRDD.collect()

[('Pedro', 39.0), ('Maria', 20.0), ('Rafael', 10.0)]

In [22]:
#Média por nome das pessoas usando a API DataFrame
from pyspark.sql.functions import avg

spark = (SparkSession.builder.appName("Ages").getOrCreate())

#Create dataframe
data_df = spark.createDataFrame([("Pedro", 38), ("Maria", 20), ("Pedro",40), ("Rafael", 10)], ["nome", "idade"])

avg_df = data_df.groupBy("nome").agg(avg("idade"))

avg_df.show()

+------+----------+
|  nome|avg(idade)|
+------+----------+
| Pedro|      39.0|
| Maria|      20.0|
|Rafael|      10.0|
+------+----------+



In [38]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import rand, randn

spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

sqlc = SQLContext(spark.sparkContext)

df = (sqlc.range(0, 1000*1000)
        .withColumn('uniform', rand(seed=10))
        .withColumn('normal', randn(seed=27))
)

print ('# rows: ', df.count())
df.show()

# rows:  1000000
+---+-------------------+--------------------+
| id|            uniform|              normal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.8664700627108758|
|  1| 0.8051143958005459| -0.5970491018333267|
|  2| 0.5775925576589018| 0.18267161219540898|
|  3| 0.9476047869880925| -1.8497305679917546|
|  4|    0.2093704977577|  0.9410417279045351|
|  5|0.36664222617947817| -0.6516475674670159|
|  6| 0.8078688178371882|  0.5901002135239671|
|  7| 0.7135143433452461|  -1.850241871360443|
|  8| 0.7195325566306053| 0.09176896733073023|
|  9|0.31335292311175456|-0.38605118617831075|
| 10| 0.8062503712025726|  1.2134544166783332|
| 11|0.10814914646176654| -1.0757702531630617|
| 12| 0.3362232980701172| 0.04961226872064977|
| 13| 0.8133304803837667|  -0.768259602441542|
| 14|0.47649428738170896|  0.2911293146907403|
| 15|  0.524728096293865|-0.33406080411047484|
| 16| 0.9701253460019921|  1.3607097640771781|
| 17| 0.6232167713919952|  0.59867729810827

In [39]:
#Médica, desvio padrão, valores minimo e máximo das colunas
df.describe().show()

+-------+------------------+--------------------+--------------------+
|summary|                id|             uniform|              normal|
+-------+------------------+--------------------+--------------------+
|  count|           1000000|             1000000|             1000000|
|   mean|          499999.5|  0.5002526365480662|0.001079181935319...|
| stddev|288675.27893234405| 0.28892225121213544|  0.9991903695324608|
|    min|                 0|2.710561290975022E-7|  -4.955719833927487|
|    max|            999999|  0.9999999832771045|   4.626848789533737|
+-------+------------------+--------------------+--------------------+



In [45]:
#Computar a correlação estatística entre variáveis
df = sqlc.range(0, 1000*1000).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))

print('cor (rand2, rand2): ', df.stat.corr('rand2', 'rand2'))
print('cor (rand1, rand2): ', df.stat.corr('rand1', 'rand2'))

cor (rand2, rand2):  1.0
cor (rand1, rand2):  -0.00048485287141450097


In [50]:
#FunçÕes definidas pelo usuário Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType

spark = SparkSession.builder.appName("SquareFunction").master("local[*]").getOrCreate()

def square(s):
  return s*s

#Registrar UDF
spark.udf.register("Square", square, LongType())

#Gerar View temporária
spark.range(1, 10000).createOrReplaceTempView("udf_test")

#Run query
spark.sql("SELECT id, square(id) from udf_test").show()

+---+----------+
| id|Square(id)|
+---+----------+
|  1|         1|
|  2|         4|
|  3|         9|
|  4|        16|
|  5|        25|
|  6|        36|
|  7|        49|
|  8|        64|
|  9|        81|
| 10|       100|
| 11|       121|
| 12|       144|
| 13|       169|
| 14|       196|
| 15|       225|
| 16|       256|
| 17|       289|
| 18|       324|
| 19|       361|
| 20|       400|
+---+----------+
only showing top 20 rows

