In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("read").\
    enableHiveSupport().\
    getOrCreate()

In [5]:
df = spark.read.csv("./source.csv", sep=",",
                    header=True, inferSchema=True)

In [6]:
df.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [7]:
df_pd = pd.read_csv("./source.csv", sep=",")
df_pd.head()


Unnamed: 0,source_id,source_username
0,100137,Merlene Blodgett
1,103582,Carmen Cura
2,106463,Richard Sanchez
3,119403,Betty De Hoyos
4,119555,Socorro Quiara


In [8]:
# To Spark from Pandas
df_spark = spark.createDataFrame(df_pd)
df_spark.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [9]:
# Spark DataFrame to Pandas DataFrame
df_pd = df_spark.toPandas()
df_pd.head()

Unnamed: 0,source_id,source_username
0,100137,Merlene Blodgett
1,103582,Carmen Cura
2,106463,Richard Sanchez
3,119403,Betty De Hoyos
4,119555,Socorro Quiara


In [10]:
import uuid   # Create unique table name
table_name = "df_" + str(uuid.uuid4().hex)  
df.write.saveAsTable(table_name)

In [11]:
query = "DESCRIBE %s" % table_name
spark.sql(query).show()


+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|      source_id|   string|   null|
|source_username|   string|   null|
+---------------+---------+-------+



In [12]:
table_name

'df_daf213f7773247cbb1f6d7dd92d24095'

In [13]:
spark.sql("SHOW DATABASES").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [14]:
spark.sql("USE default")
spark.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|df_daf213f7773247...|      false|
+--------+--------------------+-----------+



In [15]:
spark.sql(f"DESCRIBE {table_name}").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|      source_id|   string|   null|
|source_username|   string|   null|
+---------------+---------+-------+



In [16]:
spark.sql(f"SELECT * FROM {table_name} LIMIT 10").show()

+---------+-------------------+
|source_id|    source_username|
+---------+-------------------+
|   100137|   Merlene Blodgett|
|   103582|        Carmen Cura|
|   106463|    Richard Sanchez|
|   119403|     Betty De Hoyos|
|   119555|     Socorro Quiara|
|   119868|Michelle San Miguel|
|   120752|     Eva T. Kleiber|
|   124405|          Lori Lara|
|   132408|      Leonard Silva|
|   135723|       Amy Cardenas|
+---------+-------------------+



In [17]:
df = spark.sql(f"SELECT * FROM {table_name}")
df.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [18]:
spark.range(1000).show(5)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



In [29]:
from pyspark.sql.functions import rand
spark.range(1000).withColumn("uniform", rand(12345)).show(5)

+---+-------------------+
| id|            uniform|
+---+-------------------+
|  0| 0.3217855146445381|
|  1| 0.5926558057691951|
|  2| 0.3530876039804548|
|  3|0.18715752944048802|
|  4| 0.9700656793032243|
+---+-------------------+
only showing top 5 rows



In [32]:
# Generate a random Bernoulli random variable with p = 0.25

df = spark.range(1000). \
  withColumn("Bernoulli", (rand(12345) < 0.25). \
  cast("int"))

In [34]:
# Generate a summary using the functional style:
df.groupby("Bernoulli").count().show()


+---------+-----+
|Bernoulli|count|
+---------+-----+
|        0|  753|
|        1|  247|
+---------+-----+



In [35]:
df.createOrReplaceTempView("bern")
spark.sql("SELECT Bernoulli, COUNT(*) AS count \
            FROM bern \
            GROUP BY Bernoulli"). \
  show()


+---------+-----+
|Bernoulli|count|
+---------+-----+
|        0|  753|
|        1|  247|
+---------+-----+



In [36]:
from pyspark.sql.functions import randn
mydf = spark.range(1000).withColumn("normal", 42 +  2 * randn(54321))
mydf.show(5)

+---+------------------+
| id|            normal|
+---+------------------+
|  0|45.362885425117504|
|  1| 42.41317857967899|
|  2|46.135931767491265|
|  3| 39.81112359797484|
|  4| 44.19365809728162|
+---+------------------+
only showing top 5 rows



In [37]:
mydf.describe("id", "normal").show()

+-------+-----------------+------------------+
|summary|               id|            normal|
+-------+-----------------+------------------+
|  count|             1000|              1000|
|   mean|            499.5| 41.98776215302001|
| stddev|288.8194360957494|2.0072883619960256|
|    min|                0|35.820280013529704|
|    max|              999| 48.94773330054072|
+-------+-----------------+------------------+

