Skip to content

Apache spark with Hive and Mysql

Vaquar Khan edited this page Jul 24, 2020 · 23 revisions
  • https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

        String hiveZookeeperLocation = "jdbc:hive2://xx.xx.xx.xx:2181,yy.yy.yy.yy:2181,zz.zz.zz.zz:2181/;serviceDiscoveryMode=zookeeper;zookeeperNameSpace=hiveserver2";
    
      
       SparkSession spark = SparkSession.builder()
                           .appName("VaquarKhan-SparkHiveExample")
                           .master("local[*]")
      	   // .config("spark.sql.warehouse.dir", hiveZookeeperLocation)
      	   .config("hive.metastore.uris", hiveZookeeperLocation)// "thrift://localhost:9083"
      	   .config("hive.mapred.supports.subdirectories", "true")
      	   .config("spark.driver.allowMultipleContexts", "true")
      	   .config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
      	   .config("checkpointLocation", "/tmp/hive") // <-- checkpoint
      	                       directory
      	   .config(" spark.sql.warehouse.dir", "//xx.xx.xx.xx/apps/hive/warehouse")
      	   .enableHiveSupport()
      	   .getOrCreate();
    
  • https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html


This setup will help o resolve error when you have no admin access on your machin and cannot run "chmod 777 to C:\tmp\hive"

Error :

org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: 
/tmp/hive on HDFS should be writable. Current permissions are: rw-rw-rw-;

			<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
				xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
				<modelVersion>4.0.0</modelVersion>
				<groupId>com.khan.vaquar</groupId>
				<artifactId>Example-java-spark</artifactId>
				<packaging>jar</packaging>
				<version>1.0-SNAPSHOT</version>
				<name>NumberGenerator</name>
				<url>http://maven.apache.org</url>

				<properties>
					<java.version>1.8</java.version>
				</properties>
				<dependencies>
					<dependency>
						<groupId>org.apache.spark</groupId>
						<artifactId>spark-core_2.12</artifactId>
						<version>2.4.0</version>
					</dependency>
					<dependency>
						<groupId>org.apache.spark</groupId>
						<artifactId>spark-sql_2.12</artifactId>
						<version>2.4.0</version>
					</dependency>
					<dependency>
						<groupId>org.apache.spark</groupId>
						<artifactId>spark-mllib_2.12</artifactId>
						<version>2.4.0</version>
						<scope>runtime</scope>
					</dependency>
					<dependency>
						<groupId>org.apache.spark</groupId>
						<artifactId>spark-streaming_2.12</artifactId>
						<version>2.4.0</version>
						<scope>provided</scope>
					</dependency>
					<dependency>
						<groupId>org.apache.spark</groupId>
						<artifactId>spark-hive_2.12</artifactId>
						<version>2.4.0</version>
						<scope>provided</scope>
					</dependency>
					<dependency>
						<groupId>com.thoughtworks.paranamer</groupId>
						<artifactId>paranamer</artifactId>
						<version>2.8</version>
					</dependency>
					<dependency>
						<groupId>junit</groupId>
						<artifactId>junit</artifactId>
						<version>4.11</version>
						<scope>test</scope>
					</dependency>
					<!-- Hive -->
					<dependency>
						<groupId>org.apache.hive</groupId>
						<artifactId>hive-jdbc</artifactId>
						<version>2.1.0</version>
						<exclusions>
							<exclusion>
								<artifactId>jdk.tools</artifactId>
								<groupId>jdk.tools</groupId>
							</exclusion>
							<exclusion>
								<groupId>org.slf4j</groupId>
								<artifactId>slf4j-api</artifactId>
							</exclusion>
						</exclusions>
					</dependency>

					<!-- A higher version of hbase-client is required to match the server -->
					<dependency>
						<groupId>org.apache.hbase</groupId>
						<artifactId>hbase-client</artifactId>
						<version>1.4.0</version>
						<exclusions>
							<exclusion>
								<groupId>log4j</groupId>
								<artifactId>log4j</artifactId>
							</exclusion>
							<exclusion>
								<groupId>org.slf4j</groupId>
								<artifactId>slf4j-log4j12</artifactId>
							</exclusion>
						</exclusions>
					</dependency>

					<dependency>
						<groupId>org.apache.phoenix</groupId>
						<artifactId>phoenix-core</artifactId>
						<!-- This is the version of our Phoenix jar install -->
						<version>4.7.0-HBase-1.1</version>
						<exclusions>
							<exclusion>
								<groupId>org.apache.hbase</groupId>
								<artifactId>hbase-client</artifactId>
							</exclusion>
							<exclusion>
								<groupId>log4j</groupId>
								<artifactId>log4j</artifactId>
							</exclusion>
							<exclusion>
								<groupId>org.slf4j</groupId>
								<artifactId>slf4j-log4j12</artifactId>
							</exclusion>
							<exclusion>
								<groupId>org.mortbay.jetty</groupId>
								<artifactId>servlet-api-2.5</artifactId>
							</exclusion>
							<exclusion>
								<artifactId>servlet-api</artifactId>
								<groupId>javax.servlet</groupId>
							</exclusion>
							<exclusion>
								<groupId>jdk.tools</groupId>
								<artifactId>jdk.tools</artifactId>
							</exclusion>
							<exclusion>
								<groupId>sqlline</groupId>
								<artifactId>sqlline</artifactId>
							</exclusion>
						</exclusions>
					</dependency>


				</dependencies>

			</project>

Read from Hive and write into Mysql

		import org.apache.spark.sql.hive.HiveContext
		val sqlContext = new HiveContext(sc)
		var db = sqlContext.sql("select * from vkhan.testTable")

		db.take(n).foreach(println)         #n, number of records



		db.write.format("jdbc").options(
		  Map("url" ->  "jdbc:mysql://xx.yy.xx.nn:3307schema?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC&user=root&password=root",
		  "dbtable" -> "loadTable",
		   "driver" -> "com.mysql.jdbc.Driver"
		  )).mode("append").save()

Read Mysql in apache Spark

		val jdbcDF = spark.read.format("jdbc").options(
		  Map("url" ->  "jdbc:mysql://xx.yy.zz.nn:3307/schema?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC&user=root&password=root",
		  "dbtable" -> "radTableName",
		  "fetchSize" -> "10000"
		  )).load()
		  
		  jdbcDF.createOrReplaceTempView("TempTable")


		val sqlDF = sql("select * FROM TempTable ")

Note : We can define either dbtable or query , if need subset or only few columns use query


Java Example

Spark 2

		public static void main(String[] args) {

			SparkConf sparkConf = new SparkConf();
			SparkContext sc = new SparkContext("local", "spark-mysql-test", sparkConf);
			SQLContext sqlContext = new SQLContext(sc);

			// here you can run sql query
			String sql = "(select * from table1 join table2 on table1.id=table2.table1_id) as test_table";
			// or use an existed table directly
			// String sql = "table1";
			DataFrame dataFrame = sqlContext
				.read()
				.format("jdbc")
				.option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true")
				.option("user", "root")
				.option("password", "password")
				.option("dbtable", sql)
				.load();

			// continue your logical code

}

Spark 1.6

		public static void main(String[] args) {
			Map<String, String> options = new HashMap<String, String>();
			options.put("url","jdbc:postgresql://<DBURL>:<PORT>/<Database>?user=<UserName>&password=<Password>");
			options.put("dbtable", "<TableName>");
			JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("DBConnection").setMaster("local[*]"));
			SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
			// DataFrame jdbcDF = sqlContext.load("jdbc", options).cache();
			DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
			System.out.println("Data------------------->" + jdbcDF.toJSON().first());
			Row[] rows = jdbcDF.collect();
			System.out.println("Without Filter \n ------------------------------------------------- ");
			for (Row row2 : rows) {
				System.out.println(row2.toString());
			}
			System.out.println("Filter Data\n ------------------------------------------------- ");
			jdbcDF = jdbcDF.select("agency_id","route_id").where(jdbcDF.col("route_id").$less$eq(3));
			rows = jdbcDF.collect();
			for (Row row2 : rows) {
				System.out.println(row2.toString());
			}
		}

Create Table with define schema into Mysql using Apache Spark

Apache spark will be created automatically when you write the jdbcDf dataframe into Mysql ,In case if you want to specify the table schema,

		jdbcDf
		 .write
		 .option("createTableColumnTypes", "name VARCHAR(500), age Int(3), address int, name VARCHAR(500)")
		 .jdbc("jdbc:mysql://xx.yy.zz.nn:3306/schema", s"${tablename}", connectionProperties)

Defaine Priary Key in mysql table using apache Spark

You can create table and use mode = append

Write only selected column

  url = ...
  table = ...

    columns = (sqlContext.read.format('jdbc')
    .options(url=url, dbtable=table)
   .load()
   .columns())

  forecastFrame.select(*columns).write.jdbc(url=url, dbtable=table, mode='append')

Spark JDBC writer supports following modes:

- append: Append contents of this :class:DataFrame to existing data.
- overwrite: Overwrite existing data.
- ignore: Silently ignore this operation if data already exists.
- error (default case): Throw an exception if data already exists

We can control the parallelism by calling coalesce() or repartition() depending on the existing number of partitions. Call coalesce when reducing the number of partitions, and repartition when increasing the number of partitions.

// Given the number of partitions above, you can reduce the partition value by calling coalesce() or increase it by calling repartition() to manage the number of connections.

           df.repartition(10).write.mode(SaveMode.Append).jdbc(jdbcUrl, "test", connectionProperties)

Clone this wiki locally