hadoop fs -ls
hadoop fs -copyFromLocal <local source> <hdfs dest>
hadoop fs -rm <hdfs file>
hadoop fs -rmdir <hdfs folder>
etc... check available commands with:
hadoop fs --help
https://hadoop.apache.org/docs/r1.0.4/webhdfs.html
curl --negotiate -u : http://hdfs....cloud:50070/webhdfs/v1/user/a.jourdan-dsti/raw?op=LISTSTATUS
curl --negotiate -L -u : http://hdfs....cloud:50070/webhdfs/v1/user/a.jourdan-dsti/raw/input.txt?op=OPEN
- Mapper: transform each input line to Key Value pair
- Shuffle & sort: group values for each key
- Reducer: process each key values (ex: count nb of values)
from mrjob.job import MRJob
from mrjob.step import MRStep
class RatingsBreakdown(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_rating,
reducer=self.reducer_count_ratings)
]
def mapper_get_rating(self, _, line):
(userID, movieID, rating, timestamp) = line.split('\t')
yield rating, 1
def reducer_count_ratings(self, key, values):
yield key, sum(values)
if __name__ == "__main__":
RatingsBreakdown.run()
run locally:
python RatingsBreakDown.py u.data
run with hadoop:
python RatingsBreakdown.py -r hadoop --hadoop-streaming-jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar u.data
su root
ambari-admin-password-reset
ratings = LOAD '/user/maria_dev/ml-100k/u.data'
AS (userID:int, movieID:int, rating:int, ratingTime:int);
metadata = LOAD '/user/maria_dev/ml-100k/u.item' USING PigStorage('|')
AS (movieID:int, movieTitle:chararray, releaseDate:chararray, videoRealese:chararray, imdblink:chararray);
nameLookup = FOREACH metadata GENERATE movieID, movieTitle,
ToUnixTime(ToDate(releaseDate, 'dd-MMM-yyyy')) AS releaseTime;
ratingsByMovie = GROUP ratings BY movieID;
avgRatings = FOREACH ratingsByMovie GENERATE group as movieID, AVG(ratings.rating) as avgRating;
fiveStarMovies = FILTER avgRatings BY avgRating > 4.0;
fiveStarsWithData = JOIN fiveStarMovies BY movieID, nameLookup BY movieID;
oldestFiveStarMovies = ORDER fiveStarsWithData BY nameLookup::releaseTime;
DUMP oldestFiveStarMovies;
-
Spark Core
-
Additional libraries:
- spark streaming
- spark SQL
- MLLib - machine learning
- GraphX
sc.textFile("file://") # or S3:// or hdfs://
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT ... FROM ...")
map # (same nb of rows input/output)
flatmap
filter
distinct
sample
union, intersection, substract, cartesian
map example:
rdd = sc.parallelize([1,2,3,4])
squaredRDD = rdd.map(lambda x: x*x)
# return 1,4,9,16
# equivalent to
def squareIt(x):
return x*x
rdd.map(squareIt)
collect
count
countByValue
take
top
reduce
...
nothing happens until an action is called !
run python spark
spark-submit <script>.py
extends rdd to dataframe
CREATE VIEW IF NOT EXISTS topMovieIDs AS
SELECT
movieID, count(movieID) as ratingCount
FROM
ratings
GROUP BY
movieID
ORDER BY ratingCount DESC;
SELECT
n.title, ratingCount
FROM
topMovieIDs t JOIN names n ON t.movieID = n.movieID;
DROP VIEW topMovieIDs;
Create external table pointing to drivers.csv
CREATE EXTERNAL TABLE IF NOT EXISTS a_jourdan(
driverId INT, name STRING, ssn INT,
location STRING, certified STRING, wageplan STRING)
COMMENT 'a.jourdan-dsti table from drivers.csv'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/a.jourdan-dsti/drivers_raw'
TBLPROPERTIES ('skip.header.line.count' = '1');
Or Use query stored in file
!run hive/drivers_create_external.hql
Create ORC table (optimized format)
CREATE TABLE IF NOT EXISTS a_jourdan_orc(
driverId INT, name STRING, ssn INT,
location STRING, certified STRING, wageplan STRING)
COMMENT 'a.jourdan-dsti table from drivers.csv'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS ORC
LOCATION '/user/a.jourdan-dsti/drivers_orc';
Insert data from external table
INSERT INTO TABLE a_jourdan_orc SELECT * FROM a_jourdan;
Joined Query
- never do:
SELECT * FROM TOTO,TATA WHERE TOTO.TOTOID = TATA.TOTOID
- use
SELECT * FROM TOTO JOIN TATA ON TOTO.TOTOID = TATA.TOTOID
- Number of titles with duration superior than 2 hours.
SELECT
count(primarytitle)
FROM
imdb_title_basics
WHERE runtimeminutes>120;
RESULT: 60446
- Average duration of titles containing the string "world".
SELECT
avg(runtimeminutes)
FROM
imdb_title_basics
WHERE primarytitle like '%world%';
RESULT: 43.58105263157895
- Average rating of titles having the genre "Comedy"
SELECT
avg(averagerating)
FROM
imdb_title_basics JOIN imdb_title_ratings
ON imdb_title_basics.tconst = imdb_title_ratings.tconst
WHERE array_contains(genres,'Comedy');
RESULT: 6.970428788330675
- Average rating of titles not having the genre "Comedy"
SELECT
avg(averagerating)
FROM
imdb_title_basics JOIN imdb_title_ratings
ON imdb_title_basics.tconst = imdb_title_ratings.tconst
WHERE NOT array_contains(genres,'Comedy');
RESULT: 6.886042545766032
- Top 10 movies directed by Quentin Tarantino
SELECT primarytitle,averagerating
FROM
(SELECT tconst
FROM imdb_title_crew
WHERE array_contains(director,(
SELECT nconst
FROM imdb_name_basics
WHERE primaryname LIKE 'Quentin Tarantino')
)
) AS qt
JOIN
imdb_title_basics ON qt.tconst = imdb_title_basics.tconst
JOIN
imdb_title_ratings ON imdb_title_ratings.tconst = imdb_title_basics.tconst
WHERE
imdb_title_basics.titletype = 'movie'
ORDER BY averagerating DESC
LIMIT 10;
RESULT:
+-------------------------------------+----------------+
| primarytitle | averagerating |
+-------------------------------------+----------------+
| Pulp Fiction | 8.9 |
| Kill Bill: The Whole Bloody Affair | 8.8 |
| Django Unchained | 8.4 |
| Reservoir Dogs | 8.3 |
| Inglourious Basterds | 8.3 |
| Kill Bill: Vol. 1 | 8.1 |
| Kill Bill: Vol. 2 | 8.0 |
| Sin City | 8.0 |
| The Hateful Eight | 7.8 |
| Grindhouse | 7.6 |
+-------------------------------------+----------------+
sqoop import --connect jdbc:mysql://localhost/movielens --driver com.mysql.jdbc.Driver --table movies -m 1
sqoop import --connect jdbc:mysql://localhost/movielens --driver com.mysql.jdbc.Driver --table movies -m 1 --hive-import
on HDP files in apps/hive/warehouse
sqoop export --connect jdbc:mysql://localhost/movielens --driver com.mysql.jdbc.Driver --table exported_movies --export-dir /apps/hive/warehouse/movies -m 1 --input-fields-terminated-by '\0001'
start Rest service
/usr/hdp/current/hbase-master/bin/hbase-daemon.sh start rest -p 8000 --infoport 8001
python script
from starbase import Connection
c = Connection("127.0.0.1", "8000")
ratings = c.table("ratings")
if ratings.exists():
print("drop existing ratings")
ratings.drop()
ratings.create('rating')
print("parsing the ml100k ratings data...")
ratingFile = open("/home/terman37/MyGit/Udemy_Hadoop/datas/ml-100k/u.data", "r")
batch = ratings.batch()
for line in ratingFile:
(userID, movieID, rating, timestamp) = line.split()
batch.update(userID, {'rating': {movieID: rating}})
ratingFile.close()
print("Committing ratings to Hbase via Rest")
batch.commit(finalize=True)
print("get back ratings for some users")
print("rating for user 1")
print(ratings.fetch("1"))
print("rating for user 33")
print(ratings.fetch("33"))
stop Rest service
/usr/hdp/current/hbase-master/bin/hbase-daemon.sh stop rest
-
create table
hbase shell => create 'users','userinfo' => list => exit
-
run
pig hbase.pig
-
hbase.pig content
users = LOAD '/user/maria_dev/ml-100k/u.user' USING PigStorage('|') AS (userID:int, age:int, gender:chararray, occupation:chararray, zip:int); STORE users INTO 'hbase://users' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage ( 'userinfo:age,userinfo:gender,userinfo:occupation,userinfo:zip');
-
-
check if ok
hbase shell => scan 'users'
-
drop table
=> disable 'users' => drop 'users' => list => exit
- Connect
hbase shell
- show tables
list
- create table
create 'dsti_ajourdan_imdb_rating','opinion','metadata'
create 'table_name','col-family-1','col-family-2'...
- Write in table
put 'dsti_ajourdan_imdb_rating','tt0266697-8.2-ajourdan','opinion:rating','8.2'
put 'dsti_ajourdan_imdb_rating','tt0378194-8.5-ajourdan','opinion:rating','8.5'
put 'dsti_ajourdan_imdb_rating','tt0378194-8.5-ajourdan','metadata:title','Kill Bill 2'
put 'dsti_ajourdan_imdb_rating','tt0266697-8.2-ajourdan','metadata:title','Kill Bill 1'
put 'dsti_ajourdan_imdb_rating','tt0378194-8.5-ajourdan','metadata:tconst','tt0378194'
put 'dsti_ajourdan_imdb_rating','tt0266697-8.2-ajourdan','metadata:title','tt0266697'
put 'table_name','rowkey','col-family:column','value'
- read 1 line
get 'dsti_ajourdan_imdb_rating','tt0266697-8.2-ajourdan'
get 'table_name','rowkey'
- show all table data
scan 'dsti_ajourdan_imdb_rating'
scan 'table_name'
- delete row
deleteall '<table_name>', '<row_key>'