In [1]:
import findspark

In [2]:
findspark.init("/usr/hdp/current/spark2-client/")

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
.master("yarn") \
.appName("Dataframe-Giris") \
.getOrCreate()

In [5]:
sc = spark.sparkContext

## Liste ve RDD'den Dataframe

In [6]:
listem = [1,2,3,4,5,6]

In [17]:
from pyspark.sql import Row
my_rdd = sc.parallelize(listem) \
.map(lambda x: (x,)) \

In [18]:
my_rdd.take(3)

[(1,), (2,), (3,)]

In [19]:
my_df = my_rdd.toDF(["rakamlar"])

In [20]:
my_df.show()

+--------+
|rakamlar|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
+--------+



## range ile DF

In [12]:
df_range = sc.parallelize(range(10,100,5)) \
.map(lambda x: (x,)) \
.toDF(["range"])

In [21]:
df_range.show(5)

+-----+
|range|
+-----+
|   10|
|   15|
|   20|
|   25|
|   30|
+-----+
only showing top 5 rows



## Dosyadan DF

In [35]:
df_from_file = spark.read.format("csv") \
.option("sep",";") \
.option("header",True) \
.option("inferSchema",True) \
.load("/user/erkan/OnlineRetail.csv")

In [41]:
df_from_file.limit(10).toPandas().head(10)

Unnamed: 0,InvoiceNo,UnitPrice
0,536365,255
1,536365,339
2,536365,275
3,536365,339
4,536365,339
5,536365,765
6,536365,425
7,536366,185
8,536366,185
9,536367,169


In [38]:
df_from_file.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [40]:
from pyspark.sql import functions as f
df_from_file.groupBy("Country").agg(f.sum("UnitPrice")).printSchema()

root
 |-- Country: string (nullable = true)
 |-- sum(UnitPrice): double (nullable = true)



In [42]:
df_from_file.select("InvoiceNo","UnitPrice").limit(10).toPandas().head(10)

Unnamed: 0,InvoiceNo,UnitPrice
0,536365,255
1,536365,339
2,536365,275
3,536365,339
4,536365,339
5,536365,765
6,536365,425
7,536366,185
8,536366,185
9,536367,169


In [43]:
df_from_file.select("InvoiceNo","UnitPrice") \
.sort("UnitPrice") \
.limit(10).toPandas().head(10)

Unnamed: 0,InvoiceNo,UnitPrice
0,A563186,-1106206
1,A563187,-1106206
2,536998,0
3,536997,0
4,536549,0
5,536414,0
6,536764,0
7,536546,0
8,536765,0
9,536941,0


In [44]:
df_from_file.select("InvoiceNo","UnitPrice") \
.sort(f.desc("UnitPrice")) \
.limit(10).toPandas().head(10)

Unnamed: 0,InvoiceNo,UnitPrice
0,553549,9996
1,559506,9996
2,579787,988
3,C579195,98714
4,573633,9879
5,556094,9875
6,555320,9854
7,C570456,9818
8,555278,9796
9,556092,9779


In [49]:
df_from_file.select("InvoiceNo","UnitPrice") \
.sort(f.desc("UnitPrice")).limit(10).explain()

== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[UnitPrice#180 DESC NULLS LAST], output=[InvoiceNo#175,UnitPrice#180])
+- *FileScan csv [InvoiceNo#175,UnitPrice#180] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://sandbox-hdp.hortonworks.com:8020/user/erkan/OnlineRetail.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,UnitPrice:string>
