The information is from the link below.  

* [https://spark.apache.org/docs/latest/rdd-programming-guide.html](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

**The pyspark operation about RDD**

In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

**1. Read the iris data(../data/iris.csv) and show first ten lines.**

In [3]:
iris = sc.textFile("../data/iris.csv")
iris.take(10)    

['5.1,3.5,1.4,0.2,setosa',
 '4.9,3,1.4,0.2,setosa',
 '4.7,3.2,1.3,0.2,setosa',
 '4.6,3.1,1.5,0.2,setosa',
 '5,3.6,1.4,0.2,setosa',
 '5.4,3.9,1.7,0.4,setosa',
 '4.6,3.4,1.4,0.3,setosa',
 '5,3.4,1.5,0.2,setosa',
 '4.4,2.9,1.4,0.2,setosa',
 '4.9,3.1,1.5,0.1,setosa']

**2. From iris data, select the lines with condition that the last column is 'setosa' and show the first ten lines.**

In [4]:
iris.filter(lambda line: 'setosa' == line.split(',')[-1]).take(10)

['5.1,3.5,1.4,0.2,setosa',
 '4.9,3,1.4,0.2,setosa',
 '4.7,3.2,1.3,0.2,setosa',
 '4.6,3.1,1.5,0.2,setosa',
 '5,3.6,1.4,0.2,setosa',
 '5.4,3.9,1.7,0.4,setosa',
 '4.6,3.4,1.4,0.3,setosa',
 '5,3.4,1.5,0.2,setosa',
 '4.4,2.9,1.4,0.2,setosa',
 '4.9,3.1,1.5,0.1,setosa']

**3. From iris data, sample 10 lines with replacement.**

In [5]:
# without transformation
iris.takeSample(True, 10)

['6.4,3.2,4.5,1.5,versicolor',
 '4.6,3.1,1.5,0.2,setosa',
 '5.7,2.6,3.5,1,versicolor',
 '6.1,2.6,5.6,1.4,virginica',
 '5.7,4.4,1.5,0.4,setosa',
 '6.2,2.9,4.3,1.3,versicolor',
 '7.7,2.8,6.7,2,virginica',
 '7.2,3.2,6,1.8,virginica',
 '6.6,2.9,4.6,1.3,versicolor',
 '7.7,2.8,6.7,2,virginica']

**4. From iris data, sample lines each with 1/10 probability with condition that the species are 'setosa' and 'versicolor'. And union those.**

In [6]:
setosa = iris.filter(lambda line: 'setosa' in line).sample(True, 1/10)
versicolor = iris.filter(lambda line: 'versicolor' in line).sample(True, 1/10)

setosa.union(versicolor).collect()

['5.1,3.7,1.5,0.4,setosa',
 '5,3.2,1.2,0.2,setosa',
 '6.4,3.2,4.5,1.5,versicolor',
 '6.9,3.1,4.9,1.5,versicolor',
 '5.2,2.7,3.9,1.4,versicolor',
 '6.7,3.1,4.4,1.4,versicolor',
 '6.2,2.2,4.5,1.5,versicolor',
 '6.3,2.5,4.9,1.5,versicolor',
 '6.1,2.8,4.7,1.2,versicolor',
 '5.5,2.4,3.8,1.1,versicolor',
 '5.5,2.4,3.7,1,versicolor',
 '6,2.7,5.1,1.6,versicolor',
 '5.5,2.5,4,1.3,versicolor',
 '5.7,3,4.2,1.2,versicolor']

**5. From iris data, make key-data with condition that the key is the last column and the value is the first column. And count the data per key**

In [7]:
key_value_iris = iris.map(lambda line: (line.split(',')[-1], float(line.split(',')[0])))
key_value_iris.countByKey()

defaultdict(int, {'setosa': 50, 'versicolor': 50, 'virginica': 50})

**6. To the key-value data, sum-up based on the key(species).**

In [8]:
key_value_iris.reduceByKey(lambda a,b:a+b).collect()

[('setosa', 250.29999999999998),
 ('versicolor', 296.8),
 ('virginica', 329.3999999999999)]

**7. To the key-value data, sort with descending by key and show the first 10 lines.**

In [9]:
key_value_iris.sortByKey(False).take(10)

[('virginica', 6.3),
 ('virginica', 5.8),
 ('virginica', 7.1),
 ('virginica', 6.3),
 ('virginica', 6.5),
 ('virginica', 7.6),
 ('virginica', 4.9),
 ('virginica', 7.3),
 ('virginica', 6.7),
 ('virginica', 7.2)]

**8. Show the row size of iris data.**

In [10]:
iris.count()

150

**9. By map() and reduce(), calculate the sum of all values of iris data except for species column.**

In [11]:
iris.map(lambda line: line.split(',')[:-1]).map(lambda line: sum([float(fac) for fac in line])).reduce(lambda a,b:a+b)

2078.1999999999994

In [12]:
# Do the same thing with smaller steps
# omit species column
value_columns = iris.map(lambda line: line.split(',')[:-1])

# sum per row
sum_per_row = value_columns.map(lambda line: sum([float(fac) for fac in line]))

# total
sum_per_row.reduce(lambda a,b: a+b)

2078.1999999999994

**The pyspark operation about DataFrames**

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


**10. Read the cat data(../data/cat.json).**

In [14]:
# load json
cat = spark.read.json("../data/cat.json")

**11. Check the type of iris and cat.**

In [15]:
print("iris: {}".format(type(iris)))
print("cat: {}".format(type(cat)))

iris: <class 'pyspark.rdd.RDD'>
cat: <class 'pyspark.sql.dataframe.DataFrame'>


**12. Show the cat data.**

In [16]:
cat.show()

+--------+----+
|    name|size|
+--------+----+
| Deborah|  30|
|Zedekiah|  80|
|    Nina|  50|
+--------+----+



**13. Show the schema of cat data.**

In [17]:
cat.printSchema()

root
 |-- name: string (nullable = true)
 |-- size: string (nullable = true)



**14. Select the 'name' column of cat data and show it.**

In [18]:
cat.select("name").show()

+--------+
|    name|
+--------+
| Deborah|
|Zedekiah|
|    Nina|
+--------+



**15. From cat data, select the line with condition that the value of 'name' column is 'Deborah' and show it**

In [19]:
cat.filter(cat["name"] == "Deborah").show()

+-------+----+
|   name|size|
+-------+----+
|Deborah|  30|
+-------+----+



**16. By spark.sql(), show all the cat data.**

In [20]:
cat.createGlobalTempView("cat")

In [21]:
spark.sql("SELECT * FROM global_temp.cat").show()

+--------+----+
|    name|size|
+--------+----+
| Deborah|  30|
|Zedekiah|  80|
|    Nina|  50|
+--------+----+



**17. Convert iris data(whose type is `<class 'pyspark.rdd.RDD'>`) to DataFrame and show it.**

In [22]:
iris_tuple = iris.map(lambda line: line.split(',')).map(lambda line: list(map(float, line[:-1])) + [str(line[-1])])

iris_dataframe = spark.createDataFrame(iris_tuple, ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species'])
iris_dataframe.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1| 

**18. Check the type of each columns of iris DataFrame.**

In [23]:
iris_dataframe.dtypes

[('sepal_length', 'double'),
 ('sepal_width', 'double'),
 ('petal_length', 'double'),
 ('petal_width', 'double'),
 ('species', 'string')]

**19. By spark.sql(), select lines from iris with condition that sepal_width > 3.0 and show it.**

In [24]:
# sepal_width > 3.0
iris_dataframe.createOrReplaceTempView("iris")
spark.sql("SELECT * FROM iris WHERE sepal_width > 3.0").show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         5.8|        4.0|         1.2|        0.2| setosa|
|         5.7|        4.4|         1.5|        0.4| setosa|
|         5.4|        3.9|         1.3|        0.4| setosa|
|         5.1|        3.5|         1.4| 