@@ -237,9 +237,7 @@ public QuorumCnxManager(QuorumPeer self) {
237237 * @param sid
238238 */
239239 public void testInitiateConnection (long sid ) throws Exception {
240- if (LOG .isDebugEnabled ()) {
241- LOG .debug ("Opening channel to server " + sid );
242- }
240+ LOG .debug ("Opening channel to server " + sid );
243241 Socket sock = new Socket ();
244242 setSockOpts (sock );
245243 sock .connect (self .getVotingView ().get (sid ).electionAddr , cnxTO );
@@ -434,17 +432,14 @@ synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr
434432 LOG .debug ("There is a connection already for server " + sid );
435433 return true ;
436434 }
437- try {
438435
439- if ( LOG . isDebugEnabled ()) {
440- LOG . debug ( "Opening channel to server " + sid );
441- }
442- Socket sock = new Socket ();
436+ Socket sock = null ;
437+ try {
438+ LOG . debug ( "Opening channel to server " + sid );
439+ sock = new Socket ();
443440 setSockOpts (sock );
444441 sock .connect (electionAddr , cnxTO );
445- if (LOG .isDebugEnabled ()) {
446- LOG .debug ("Connected to server " + sid );
447- }
442+ LOG .debug ("Connected to server " + sid );
448443 initiateConnection (sock , sid );
449444 return true ;
450445 } catch (UnresolvedAddressException e ) {
@@ -454,11 +449,13 @@ synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr
454449 // detail.
455450 LOG .warn ("Cannot open channel to " + sid
456451 + " at election address " + electionAddr , e );
452+ closeSocket (sock );
457453 throw e ;
458454 } catch (IOException e ) {
459455 LOG .warn ("Cannot open channel to " + sid
460456 + " at election address " + electionAddr ,
461457 e );
458+ closeSocket (sock );
462459 return false ;
463460 }
464461
@@ -574,6 +571,10 @@ private void setSockOpts(Socket sock) throws SocketException {
574571 * Reference to socket
575572 */
576573 private void closeSocket (Socket sock ) {
574+ if (sock == null ) {
575+ return ;
576+ }
577+
577578 try {
578579 sock .close ();
579580 } catch (IOException ie ) {
@@ -614,7 +615,7 @@ public Listener() {
614615 public void run () {
615616 int numRetries = 0 ;
616617 InetSocketAddress addr ;
617-
618+ Socket client = null ;
618619 while ((!shutdown ) && (numRetries < 3 )){
619620 try {
620621 ss = new ServerSocket ();
@@ -632,7 +633,7 @@ public void run() {
632633 setName (addr .toString ());
633634 ss .bind (addr );
634635 while (!shutdown ) {
635- Socket client = ss .accept ();
636+ client = ss .accept ();
636637 setSockOpts (client );
637638 LOG .info ("Received connection request "
638639 + client .getRemoteSocketAddress ());
@@ -654,6 +655,7 @@ public void run() {
654655 LOG .error ("Interrupted while sleeping. " +
655656 "Ignoring exception" , ie );
656657 }
658+ closeSocket (client );
657659 }
658660 }
659661 LOG .info ("Leaving listener" );
@@ -739,9 +741,7 @@ synchronized RecvWorker getRecvWorker(){
739741 }
740742
741743 synchronized boolean finish () {
742- if (LOG .isDebugEnabled ()) {
743- LOG .debug ("Calling finish for " + sid );
744- }
744+ LOG .debug ("Calling finish for " + sid );
745745
746746 if (!running ){
747747 /*
@@ -752,16 +752,14 @@ synchronized boolean finish() {
752752
753753 running = false ;
754754 closeSocket (sock );
755- // channel = null;
756755
757756 this .interrupt ();
758757 if (recvWorker != null ) {
759758 recvWorker .finish ();
760759 }
761760
762- if (LOG .isDebugEnabled ()) {
763- LOG .debug ("Removing entry from senderWorkerMap sid=" + sid );
764- }
761+ LOG .debug ("Removing entry from senderWorkerMap sid=" + sid );
762+
765763 senderWorkerMap .remove (sid , this );
766764 threadCnt .decrementAndGet ();
767765 return running ;
@@ -919,9 +917,7 @@ public void run() {
919917 } finally {
920918 LOG .warn ("Interrupting SendWorker" );
921919 sw .finish ();
922- if (sock != null ) {
923- closeSocket (sock );
924- }
920+ closeSocket (sock );
925921 }
926922 }
927923 }
0 commit comments