In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local") \
.appName("chapter_5") \
.getOrCreate()

24/09/18 14:49:32 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.62 instead (on interface en0)
24/09/18 14:49:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/18 14:49:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark.read.format("json").load("./data/json/2015-summary.json").schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

## Schema

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), True),
])

df = spark.read.format("json") \
.schema(myManualSchema) \
.load("./data/json/2015-summary.json")

In [7]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

## DataFrame의 트랜스포메이션

In [14]:
df.createOrReplaceTempView("dfTable")

In [15]:
from pyspark.sql import Row

myRow = Row("hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            hello|               NULL|    1|
+-----------------+-------------------+-----+



                                                                                

## select와 selectExpr

In [16]:
from pyspark.sql.functions import expr, col, column

df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
.show()

+--------------------+--------------------+--------------------+
|   DEST_COUNTRY_NAME|   DEST_COUNTRY_NAME|   DEST_COUNTRY_NAME|
+--------------------+--------------------+--------------------+
|       United States|       United States|       United States|
|       United States|       United States|       United States|
|       United States|       United States|       United States|
|               Egypt|               Egypt|               Egypt|
|       United States|       United States|       United States|
|       United States|       United States|       United States|
|       United States|       United States|       United States|
|          Costa Rica|          Costa Rica|          Costa Rica|
|             Senegal|             Senegal|             Senegal|
|             Moldova|             Moldova|             Moldova|
|       United States|       United States|       United States|
|       United States|       United States|       United States|
|              Guyana|   

In [18]:
df.selectExpr(
    "*",
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show()

+--------------------+-------------------+-----+-------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+--------------------+-------------------+-----+-------------+
|       United States|            Romania|   15|        false|
|       United States|            Croatia|    1|        false|
|       United States|            Ireland|  344|        false|
|               Egypt|      United States|   15|        false|
|       United States|              India|   62|        false|
|       United States|          Singapore|    1|        false|
|       United States|            Grenada|   62|        false|
|          Costa Rica|      United States|  588|        false|
|             Senegal|      United States|   40|        false|
|             Moldova|      United States|    1|        false|
|       United States|       Sint Maarten|  325|        false|
|       United States|   Marshall Islands|   39|        false|
|              Guyana|      United States|   64|       

In [19]:
from pyspark.sql.functions import lit

df.select(expr("*"), lit(1).alias("One")).show()

+--------------------+-------------------+-----+---+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+--------------------+-------------------+-----+---+
|       United States|            Romania|   15|  1|
|       United States|            Croatia|    1|  1|
|       United States|            Ireland|  344|  1|
|               Egypt|      United States|   15|  1|
|       United States|              India|   62|  1|
|       United States|          Singapore|    1|  1|
|       United States|            Grenada|   62|  1|
|          Costa Rica|      United States|  588|  1|
|             Senegal|      United States|   40|  1|
|             Moldova|      United States|    1|  1|
|       United States|       Sint Maarten|  325|  1|
|       United States|   Marshall Islands|   39|  1|
|              Guyana|      United States|   64|  1|
|               Malta|      United States|    1|  1|
|            Anguilla|      United States|   41|  1|
|             Bolivia|      United States|   3

## 컬럼 추가하기

In [22]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show()

+--------------------+-------------------+-----+-------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+--------------------+-------------------+-----+-------------+
|       United States|            Romania|   15|        false|
|       United States|            Croatia|    1|        false|
|       United States|            Ireland|  344|        false|
|               Egypt|      United States|   15|        false|
|       United States|              India|   62|        false|
|       United States|          Singapore|    1|        false|
|       United States|            Grenada|   62|        false|
|          Costa Rica|      United States|  588|        false|
|             Senegal|      United States|   40|        false|
|             Moldova|      United States|    1|        false|
|       United States|       Sint Maarten|  325|        false|
|       United States|   Marshall Islands|   39|        false|
|              Guyana|      United States|   64|       

## 로우 필터링 하기

In [23]:
df.where("count < 2").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Croatia|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
|            Suriname|      United States|    1|
|       United States|             Cyprus|    1|
|        Burkina Faso|      United States|    1|
|            Djibouti|      United States|    1|
|       United States|            Estonia|    1|
|              Zambia|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United States|            Georgia|    1|
|       United States|            Bahrain|    1|
|       Cote d'Ivoir

## 무작위 샘플 만들기 

In [24]:
seed = 5
withReplacement = False
fraction = 0.5

df.sample(withReplacement, fraction, seed).show()

+--------------------+--------------------+-----+
|   DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+--------------------+--------------------+-----+
|       United States|             Romania|   15|
|       United States|             Croatia|    1|
|       United States|               India|   62|
|       United States|           Singapore|    1|
|       United States|             Grenada|   62|
|             Senegal|       United States|   40|
|             Moldova|       United States|    1|
|       United States|    Marshall Islands|   39|
|              Guyana|       United States|   64|
|             Bolivia|       United States|   30|
|       United States|            Paraguay|    6|
|       United States|           Gibraltar|    1|
|Saint Vincent and...|       United States|    1|
|               Italy|       United States|  382|
|       United States|Federated States ...|   69|
|       United States|              Russia|  161|
|       United States|         Netherlands|  660|


## 로우 합치기와 추가하기

In [26]:
schema = df.schema
newRows = [
    Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)
]

parallelizedRows = spark.sparkContext.parallelize(newRows)
newDf = spark.createDataFrame(parallelizedRows, schema)

df.union(newDf) \
.where("count = 1") \
.where(col("ORIGIN_COUNTRY_NAME") != "United States") \
.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



## repartition과 coalesce

In [27]:
df.rdd.getNumPartitions()

1

In [28]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [29]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [30]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]